This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 962222e8426 branch-4.0: [Fix](Cloud)decouple min pipeline executor
size from ConnectContext #60958 (#61005)
962222e8426 is described below
commit 962222e842657f175d79740d68aa0b8d168b9aac
Author: Calvin Kirs <[email protected]>
AuthorDate: Wed Mar 4 10:21:06 2026 +0800
branch-4.0: [Fix](Cloud)decouple min pipeline executor size from
ConnectContext #60958 (#61005)
#60958
---
.../doris/cloud/system/CloudSystemInfoService.java | 12 +----
.../apache/doris/datasource/ExternalScanNode.java | 12 +++--
.../apache/doris/datasource/FileQueryScanNode.java | 13 +++--
.../org/apache/doris/datasource/FileScanNode.java | 12 +++--
.../doris/source/RemoteDorisScanNode.java | 6 ++-
.../doris/datasource/es/source/EsScanNode.java | 5 +-
.../doris/datasource/hive/source/HiveScanNode.java | 13 ++---
.../doris/datasource/hudi/source/HudiScanNode.java | 6 ++-
.../iceberg/rewrite/RewriteGroupTask.java | 3 +-
.../datasource/iceberg/source/IcebergScanNode.java | 13 +++--
.../doris/datasource/jdbc/source/JdbcScanNode.java | 16 ++++--
.../lakesoul/source/LakeSoulScanNode.java | 7 +--
.../maxcompute/source/MaxComputeScanNode.java | 13 +++--
.../doris/datasource/odbc/source/OdbcScanNode.java | 11 ++--
.../datasource/paimon/source/PaimonScanNode.java | 10 ++--
.../source/TrinoConnectorScanNode.java | 7 +--
.../datasource/tvf/source/MetadataScanNode.java | 6 ++-
.../doris/datasource/tvf/source/TVFScanNode.java | 8 +--
.../org/apache/doris/nereids/cost/CostModel.java | 3 +-
.../glue/translator/PhysicalPlanTranslator.java | 43 +++++++++-------
.../glue/translator/PlanTranslatorContext.java | 21 ++++++++
.../nereids/load/NereidsLoadingTaskPlanner.java | 6 ++-
.../nereids/load/NereidsStreamLoadPlanner.java | 6 ++-
.../properties/ChildrenPropertiesRegulator.java | 7 +--
.../doris/nereids/rules/rewrite/SaltJoin.java | 4 +-
.../planner/BackendPartitionedSchemaScanNode.java | 5 +-
.../java/org/apache/doris/planner/CTEScanNode.java | 4 +-
.../org/apache/doris/planner/DataGenScanNode.java | 10 ++--
.../org/apache/doris/planner/FileLoadScanNode.java | 4 +-
.../apache/doris/planner/GroupCommitScanNode.java | 5 +-
.../org/apache/doris/planner/OlapScanNode.java | 9 ++--
.../org/apache/doris/planner/PlanFragment.java | 6 ++-
.../java/org/apache/doris/planner/ScanContext.java | 58 ++++++++++++++++++++++
.../java/org/apache/doris/planner/ScanNode.java | 27 ++++++++--
.../org/apache/doris/planner/SchemaScanNode.java | 5 +-
.../java/org/apache/doris/qe/SessionVariable.java | 37 ++++++++++++--
.../java/org/apache/doris/qe/StmtExecutor.java | 4 +-
.../org/apache/doris/system/SystemInfoService.java | 4 +-
.../tablefunction/DataGenTableValuedFunction.java | 4 +-
.../ExternalFileTableValuedFunction.java | 5 +-
.../GroupCommitTableValuedFunction.java | 4 +-
.../tablefunction/JdbcQueryTableValueFunction.java | 4 +-
.../tablefunction/MetadataTableValuedFunction.java | 4 +-
.../cloud/system/CloudSystemInfoServiceTest.java | 30 +++++------
.../doris/datasource/FileQueryScanNodeTest.java | 5 +-
.../datasource/hive/source/HiveScanNodeTest.java | 5 +-
.../iceberg/source/IcebergScanNodeTest.java | 3 +-
.../paimon/source/PaimonScanNodeTest.java | 6 ++-
.../tvf/source/MetadataScanNodeTest.java | 7 +--
.../datasource/tvf/source/TVFScanNodeTest.java | 3 +-
.../org/apache/doris/qe/HmsQueryCacheTest.java | 3 +-
.../org/apache/doris/qe/OlapQueryCacheTest.java | 5 +-
.../apache/doris/system/SystemInfoServiceTest.java | 12 ++---
53 files changed, 379 insertions(+), 162 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 8103a9ecfd8..5d62986cff7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -772,16 +772,8 @@ public class CloudSystemInfoService extends
SystemInfoService {
}
@Override
- public int getMinPipelineExecutorSize() {
- String clusterName = "";
- try {
- clusterName = ConnectContext.get().getCloudCluster(false);
- } catch (ComputeGroupException e) {
- LOG.warn("failed to get cluster name", e);
- return 1;
- }
- if (ConnectContext.get() != null
- && Strings.isNullOrEmpty(clusterName)) {
+ public int getMinPipelineExecutorSize(String clusterName) {
+ if (Strings.isNullOrEmpty(clusterName)) {
return 1;
}
List<Backend> currentBackends = getBackendsByClusterName(clusterName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
index ac805c33508..8829f1fdbdd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
@@ -50,9 +51,14 @@ public abstract class ExternalScanNode extends ScanNode {
? new
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING)
: new FederationBackendPolicy();
- public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName, StatisticalType statisticalType,
- boolean needCheckColumnPriv) {
- super(id, desc, planNodeName, statisticalType);
+ public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
+ ScanContext scanContext, boolean needCheckColumnPriv) {
+ this(id, desc, planNodeName, StatisticalType.DEFAULT, scanContext,
needCheckColumnPriv);
+ }
+
+ public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
+ StatisticalType statisticalType, ScanContext scanContext, boolean
needCheckColumnPriv) {
+ super(id, desc, planNodeName, scanContext, statisticalType);
this.needCheckColumnPriv = needCheckColumnPriv;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index ab039e9e9b6..1dc6902681d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -35,6 +35,7 @@ import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.hive.source.HiveSplit;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
@@ -105,9 +106,13 @@ public abstract class FileQueryScanNode extends
FileScanNode {
* These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
*/
public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
- StatisticalType statisticalType, boolean needCheckColumnPriv,
- SessionVariable sv) {
- super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
+ ScanContext scanContext, boolean needCheckColumnPriv,
SessionVariable sv) {
+ this(id, desc, planNodeName, StatisticalType.DEFAULT, scanContext,
needCheckColumnPriv, sv);
+ }
+
+ public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
+ StatisticalType statisticalType, ScanContext scanContext, boolean
needCheckColumnPriv, SessionVariable sv) {
+ super(id, desc, planNodeName, statisticalType, scanContext,
needCheckColumnPriv);
this.sessionVariable = sv;
}
@@ -539,7 +544,7 @@ public abstract class FileQueryScanNode extends
FileScanNode {
@Override
public int getNumInstances() {
if (sessionVariable.isIgnoreStorageDataDistribution()) {
- return sessionVariable.getParallelExecInstanceNum();
+ return
sessionVariable.getParallelExecInstanceNum(scanContext.getClusterName());
}
return scanRangeLocations.size();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index bbae44d66a4..cbf2420cead 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -35,6 +35,7 @@ import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.VarcharType;
import org.apache.doris.nereids.util.TypeCoercionUtils;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TExpr;
@@ -71,9 +72,14 @@ public abstract class FileScanNode extends ExternalScanNode {
// For display pushdown agg result
protected long tableLevelRowCount = -1;
- public FileScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName, StatisticalType statisticalType,
- boolean needCheckColumnPriv) {
- super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
+ public FileScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
+ ScanContext scanContext, boolean needCheckColumnPriv) {
+ this(id, desc, planNodeName, StatisticalType.DEFAULT, scanContext,
needCheckColumnPriv);
+ }
+
+ public FileScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
+ StatisticalType statisticalType, ScanContext scanContext, boolean
needCheckColumnPriv) {
+ super(id, desc, planNodeName, statisticalType, scanContext,
needCheckColumnPriv);
this.needCheckColumnPriv = needCheckColumnPriv;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/source/RemoteDorisScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/source/RemoteDorisScanNode.java
index 520f135baaa..23abb9f0b52 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/source/RemoteDorisScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/source/RemoteDorisScanNode.java
@@ -31,6 +31,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
@@ -77,8 +78,9 @@ public class RemoteDorisScanNode extends FileQueryScanNode {
private RemoteDorisSource source;
public RemoteDorisScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv,
- SessionVariable sv) {
- super(id, desc, "REMOTE_DORIS_SCAN_NODE",
StatisticalType.REMOTE_DORIS_SCAN_NODE, needCheckColumnPriv, sv);
+ SessionVariable sv, ScanContext scanContext) {
+ super(id, desc, "REMOTE_DORIS_SCAN_NODE",
StatisticalType.REMOTE_DORIS_SCAN_NODE,
+ scanContext, needCheckColumnPriv, sv);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
index 1cdd351a22a..539efd7b7ca 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
@@ -40,6 +40,7 @@ import
org.apache.doris.datasource.es.QueryBuilders.QueryBuilder;
import org.apache.doris.planner.PartitionPruner;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.RangePartitionPrunerV2;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Backend;
@@ -83,8 +84,8 @@ public class EsScanNode extends ExternalScanNode {
/**
* For multicatalog es.
**/
- public EsScanNode(PlanNodeId id, TupleDescriptor desc, boolean
esExternalTable) {
- super(id, desc, "EsScanNode", StatisticalType.ES_SCAN_NODE, false);
+ public EsScanNode(PlanNodeId id, TupleDescriptor desc, boolean
esExternalTable, ScanContext scanContext) {
+ super(id, desc, "EsScanNode", StatisticalType.ES_SCAN_NODE,
scanContext, false);
if (esExternalTable) {
EsExternalTable externalTable = (EsExternalTable)
(desc.getTable());
table = externalTable.getEsTable();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 41dc478c38c..4454d48d74c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -48,6 +48,7 @@ import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.fs.DirectoryLister;
import
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
@@ -112,14 +113,15 @@ public class HiveScanNode extends FileQueryScanNode {
* These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
*/
public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv, SessionVariable sv,
- DirectoryLister directoryLister) {
- this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE,
needCheckColumnPriv, sv, directoryLister);
+ DirectoryLister directoryLister, ScanContext scanContext) {
+ this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE,
+ needCheckColumnPriv, sv, directoryLister, scanContext);
}
public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
StatisticalType statisticalType, boolean needCheckColumnPriv,
SessionVariable sv,
- DirectoryLister directoryLister) {
- super(id, desc, planNodeName, statisticalType, needCheckColumnPriv,
sv);
+ DirectoryLister directoryLister, ScanContext scanContext) {
+ super(id, desc, planNodeName, statisticalType, scanContext,
needCheckColumnPriv, sv);
hmsTable = (HMSExternalTable) desc.getTable();
brokerName = hmsTable.getCatalog().bindBrokerName();
this.directoryLister = directoryLister;
@@ -321,7 +323,7 @@ public class HiveScanNode extends FileQueryScanNode {
totalFileNum += fileCacheValue.getFiles().size();
}
}
- int parallelNum = sessionVariable.getParallelExecInstanceNum();
+ int parallelNum =
sessionVariable.getParallelExecInstanceNum(scanContext.getClusterName());
needSplit = FileSplitter.needSplitForCountPushdown(parallelNum,
numBackends, totalFileNum);
}
@@ -638,4 +640,3 @@ public class HiveScanNode extends FileQueryScanNode {
return compressType;
}
}
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 4025deff6c8..0b63d4d539a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -40,6 +40,7 @@ import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.fs.DirectoryLister;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
@@ -131,8 +132,9 @@ public class HudiScanNode extends HiveScanNode {
*/
public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv,
Optional<TableScanParams> scanParams,
Optional<IncrementalRelation> incrementalRelation,
- SessionVariable sv, DirectoryLister directoryLister) {
- super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE,
needCheckColumnPriv, sv, directoryLister);
+ SessionVariable sv, DirectoryLister directoryLister, ScanContext
scanContext) {
+ super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE,
+ needCheckColumnPriv, sv, directoryLister, scanContext);
isCowTable = hmsTable.isHoodieCowTable();
if (LOG.isDebugEnabled()) {
if (isCowTable) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
index cd623528c76..13b7ee3ff68 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
@@ -288,7 +288,8 @@ public class RewriteGroupTask implements
TransientTaskExecutor {
"availableBeCount must be greater than 0 for rewrite task");
// 3. Get default parallelism from session variable (pipeline task num)
- int defaultParallelism =
connectContext.getSessionVariable().getParallelExecInstanceNum();
+ String clusterName =
connectContext.getSessionVariable().resolveCloudClusterName(connectContext);
+ int defaultParallelism =
connectContext.getSessionVariable().getParallelExecInstanceNum(clusterName);
// 4. Determine strategy based on expected file count
boolean useGather = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 982fe4a7e2c..8b5b4fc205d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -46,6 +46,7 @@ import
org.apache.doris.datasource.iceberg.profile.IcebergMetricsReporter;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
@@ -153,8 +154,8 @@ public class IcebergScanNode extends FileQueryScanNode {
// for test
@VisibleForTesting
- public IcebergScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
- super(id, desc, "ICEBERG_SCAN_NODE",
StatisticalType.ICEBERG_SCAN_NODE, false, sv);
+ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv, ScanContext scanContext) {
+ super(id, desc, "ICEBERG_SCAN_NODE",
StatisticalType.ICEBERG_SCAN_NODE, scanContext, false, sv);
}
/**
@@ -163,8 +164,10 @@ public class IcebergScanNode extends FileQueryScanNode {
* eg: s3 tvf
* These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
*/
- public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv, SessionVariable sv) {
- super(id, desc, "ICEBERG_SCAN_NODE",
StatisticalType.ICEBERG_SCAN_NODE, needCheckColumnPriv, sv);
+ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv, SessionVariable sv,
+ ScanContext scanContext) {
+ super(id, desc, "ICEBERG_SCAN_NODE", StatisticalType.ICEBERG_SCAN_NODE,
+ scanContext, needCheckColumnPriv, sv);
ExternalTable table = (ExternalTable) desc.getTable();
if (table instanceof HMSExternalTable) {
@@ -739,7 +742,7 @@ public class IcebergScanNode extends FileQueryScanNode {
try (CloseableIterable<FileScanTask> fileScanTasks =
planFileScanTask(scan)) {
if (tableLevelPushDownCount) {
int needSplitCnt = countFromSnapshot <
COUNT_WITH_PARALLEL_SPLITS
- ? 1 : sessionVariable.getParallelExecInstanceNum() *
numBackends;
+ ? 1 :
sessionVariable.getParallelExecInstanceNum(scanContext.getClusterName()) *
numBackends;
for (FileScanTask next : fileScanTasks) {
splits.add(createIcebergSplit(next));
if (splits.size() >= needSplitCnt) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
index 793dc352fee..7e5a636455d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
@@ -40,6 +40,7 @@ import org.apache.doris.datasource.ExternalFunctionRules;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
@@ -71,8 +72,8 @@ public class JdbcScanNode extends ExternalScanNode {
private JdbcTable tbl;
private long catalogId;
- public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean
isJdbcExternalTable) {
- super(id, desc, "JdbcScanNode", StatisticalType.JDBC_SCAN_NODE, false);
+ public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean
isJdbcExternalTable, ScanContext scanContext) {
+ super(id, desc, "JdbcScanNode", StatisticalType.JDBC_SCAN_NODE,
scanContext, false);
if (isJdbcExternalTable) {
JdbcExternalTable jdbcExternalTable = (JdbcExternalTable)
(desc.getTable());
tbl = jdbcExternalTable.getJdbcTable();
@@ -83,8 +84,9 @@ public class JdbcScanNode extends ExternalScanNode {
tableName = tbl.getProperRemoteFullTableName(jdbcType);
}
- public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean
isTableValuedFunction, String query) {
- super(id, desc, "JdbcScanNode", StatisticalType.JDBC_SCAN_NODE, false);
+ public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean
isTableValuedFunction, String query,
+ ScanContext scanContext) {
+ super(id, desc, "JdbcScanNode", StatisticalType.JDBC_SCAN_NODE,
scanContext, false);
this.isTableValuedFunction = isTableValuedFunction;
this.query = query;
tbl = (JdbcTable) desc.getTable();
@@ -275,7 +277,11 @@ public class JdbcScanNode extends ExternalScanNode {
@Override
public int getNumInstances() {
- return
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+ ConnectContext context = ConnectContext.get();
+ if (context == null) {
+ return 1;
+ }
+ return
context.getSessionVariable().getParallelExecInstanceNum(scanContext.getClusterName());
}
private static boolean shouldPushDownConjunct(TOdbcTableType tableType,
Expr expr) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
index a5e673a6cd5..a1e287432de 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
@@ -26,6 +26,7 @@ import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable;
import org.apache.doris.datasource.lakesoul.LakeSoulUtils;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
@@ -83,8 +84,9 @@ public class LakeSoulScanNode extends FileQueryScanNode {
String readType;
- public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv, SessionVariable sv) {
- super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE,
needCheckColumnPriv, sv);
+ public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv, SessionVariable sv,
+ ScanContext scanContext) {
+ super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE,
scanContext, needCheckColumnPriv, sv);
}
@Override
@@ -285,4 +287,3 @@ public class LakeSoulScanNode extends FileQueryScanNode {
return splits;
}
}
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
index 502a2c645cd..24c681c8959 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
@@ -43,6 +43,7 @@ import
org.apache.doris.datasource.maxcompute.source.MaxComputeSplit.SplitType;
import
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.nereids.util.DateUtils;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
@@ -111,15 +112,17 @@ public class MaxComputeScanNode extends FileQueryScanNode
{
// For new planner
public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc,
SelectedPartitions selectedPartitions, boolean needCheckColumnPriv,
- SessionVariable sv) {
+ SessionVariable sv, ScanContext scanContext) {
this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE,
- selectedPartitions, needCheckColumnPriv, sv);
+ selectedPartitions, needCheckColumnPriv, sv, scanContext);
}
private MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
- StatisticalType statisticalType, SelectedPartitions
selectedPartitions,
- boolean needCheckColumnPriv, SessionVariable sv) {
- super(id, desc, planNodeName, statisticalType, needCheckColumnPriv,
sv);
+ StatisticalType statisticalType,
+ SelectedPartitions selectedPartitions, boolean
needCheckColumnPriv, SessionVariable sv,
+ ScanContext scanContext) {
+ super(id, desc, planNodeName, statisticalType, scanContext,
+ needCheckColumnPriv, sv);
table = (MaxComputeExternalTable) desc.getTable();
this.selectedPartitions = selectedPartitions;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
index da4c96c7a59..9d8bf242823 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
@@ -66,8 +67,8 @@ public class OdbcScanNode extends ExternalScanNode {
/**
* Constructs node to scan given data files of table 'tbl'.
*/
- public OdbcScanNode(PlanNodeId id, TupleDescriptor desc, OdbcTable tbl) {
- super(id, desc, "SCAN ODBC", StatisticalType.ODBC_SCAN_NODE, false);
+ public OdbcScanNode(PlanNodeId id, TupleDescriptor desc, OdbcTable tbl,
ScanContext scanContext) {
+ super(id, desc, "SCAN ODBC", StatisticalType.ODBC_SCAN_NODE,
scanContext, false);
connectString = tbl.getConnectString();
odbcType = tbl.getOdbcTableType();
tblName = JdbcTable.databaseProperName(odbcType,
tbl.getOdbcTableName());
@@ -215,7 +216,11 @@ public class OdbcScanNode extends ExternalScanNode {
@Override
public int getNumInstances() {
- return
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+ ConnectContext context = ConnectContext.get();
+ if (context == null) {
+ return 1;
+ }
+ return
context.getSessionVariable().getParallelExecInstanceNum(scanContext.getClusterName());
}
public static boolean shouldPushDownConjunct(TOdbcTableType tableType,
Expr expr) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 3004e9dc027..5a7ecec791f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -37,6 +37,7 @@ import
org.apache.doris.datasource.paimon.profile.PaimonMetricRegistry;
import org.apache.doris.datasource.paimon.profile.PaimonScanMetricsReporter;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
@@ -148,8 +149,10 @@ public class PaimonScanNode extends FileQueryScanNode {
public PaimonScanNode(PlanNodeId id,
TupleDescriptor desc,
boolean needCheckColumnPriv,
- SessionVariable sv) {
- super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE,
needCheckColumnPriv, sv);
+ SessionVariable sv,
+ ScanContext scanContext) {
+ super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE,
+ scanContext, needCheckColumnPriv, sv);
}
@Override
@@ -404,7 +407,8 @@ public class PaimonScanNode extends FileQueryScanNode {
// if applyCountPushdown is true, calcute row count for count pushdown
if (applyCountPushdown && !pushDownCountSplits.isEmpty()) {
if (pushDownCountSum > COUNT_WITH_PARALLEL_SPLITS) {
- int minSplits = sessionVariable.getParallelExecInstanceNum() *
numBackends;
+ int minSplits =
sessionVariable.getParallelExecInstanceNum(scanContext.getClusterName())
+ * numBackends;
pushDownCountSplits = pushDownCountSplits.subList(0,
Math.min(pushDownCountSplits.size(), minSplits));
} else {
pushDownCountSplits =
Collections.singletonList(pushDownCountSplits.get(0));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
index 64cba7f8a4e..44d89970471 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
@@ -28,6 +28,7 @@ import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.trinoconnector.TrinoConnectorPluginLoader;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
@@ -99,9 +100,9 @@ public class TrinoConnectorScanNode extends
FileQueryScanNode {
private Constraint constraint;
public TrinoConnectorScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv,
- SessionVariable sv) {
- super(id, desc, "TRINO_CONNECTOR_SCAN_NODE",
StatisticalType.TRINO_CONNECTOR_SCAN_NODE, needCheckColumnPriv,
- sv);
+ SessionVariable sv, ScanContext scanContext) {
+ super(id, desc, "TRINO_CONNECTOR_SCAN_NODE",
StatisticalType.TRINO_CONNECTOR_SCAN_NODE,
+ scanContext, needCheckColumnPriv, sv);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
index ac34ac817c4..eae0ab4f406 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Backend;
@@ -45,8 +46,9 @@ public class MetadataScanNode extends ExternalScanNode {
private boolean initedScanRangeLocations = false;
private final List<TScanRangeLocations> scanRangeLocations =
Lists.newArrayList();
- public MetadataScanNode(PlanNodeId id, TupleDescriptor desc,
MetadataTableValuedFunction tvf) {
- super(id, desc, "METADATA_SCAN_NODE",
StatisticalType.METADATA_SCAN_NODE, false);
+ public MetadataScanNode(PlanNodeId id, TupleDescriptor desc,
MetadataTableValuedFunction tvf,
+ ScanContext scanContext) {
+ super(id, desc, "METADATA_SCAN_NODE",
StatisticalType.METADATA_SCAN_NODE, scanContext, false);
this.tvf = tvf;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
index 12db0b7c1bf..6ba1b5d4a32 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
@@ -32,6 +32,7 @@ import org.apache.doris.datasource.FileSplit.FileSplitCreator;
import org.apache.doris.datasource.FileSplitter;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
@@ -68,8 +69,9 @@ public class TVFScanNode extends FileQueryScanNode {
* eg: s3 tvf
* These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
*/
- public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv, SessionVariable sv) {
- super(id, desc, "TVF_SCAN_NODE", StatisticalType.TVF_SCAN_NODE,
needCheckColumnPriv, sv);
+ public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv, SessionVariable sv,
+ ScanContext scanContext) {
+ super(id, desc, "TVF_SCAN_NODE", StatisticalType.TVF_SCAN_NODE,
scanContext, needCheckColumnPriv, sv);
table = (FunctionGenTable) this.desc.getTable();
tableValuedFunction = (ExternalFileTableValuedFunction) table.getTvf();
}
@@ -142,7 +144,7 @@ public class TVFScanNode extends FileQueryScanNode {
// Push down count optimization.
boolean needSplit = true;
if (getPushDownAggNoGroupingOp() == TPushAggOp.COUNT) {
- int parallelNum = sessionVariable.getParallelExecInstanceNum();
+ int parallelNum =
sessionVariable.getParallelExecInstanceNum(scanContext.getClusterName());
int totalFileNum = fileStatuses.size();
needSplit = FileSplitter.needSplitForCountPushdown(parallelNum,
numBackends, totalFileNum);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java
index 9f828847eaa..b113a3d1cc6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java
@@ -99,7 +99,8 @@ class CostModel extends PlanVisitor<Cost, PlanContext> {
parallelInstance = 8;
} else {
beNumber = Math.max(1,
connectContext.getEnv().getClusterInfo().getBackendsNumber(true));
- parallelInstance = Math.max(1,
connectContext.getSessionVariable().getParallelExecInstanceNum());
+ String clusterName =
sessionVariable.resolveCloudClusterName(connectContext);
+ parallelInstance = Math.max(1,
sessionVariable.getParallelExecInstanceNum(clusterName));
}
this.hboPlanStatisticsProvider =
Objects.requireNonNull(Env.getCurrentEnv().getHboPlanStatisticsManager()
.getHboPlanStatisticsProvider(), "HboPlanStatisticsProvider is
null");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 0a98be0281b..dcf5e3ec8eb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -69,7 +69,6 @@ import
org.apache.doris.datasource.lakesoul.source.LakeSoulScanNode;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode;
import org.apache.doris.datasource.odbc.source.OdbcScanNode;
-import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.datasource.paimon.source.PaimonScanNode;
import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable;
import
org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode;
@@ -649,10 +648,12 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
}
switch (((HMSExternalTable) table).getDlaType()) {
case ICEBERG:
- scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv);
+ scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv,
+ context.getScanContext());
break;
case HIVE:
- scanNode = new HiveScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv, directoryLister);
+ scanNode = new HiveScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv, directoryLister,
+ context.getScanContext());
HiveScanNode hiveScanNode = (HiveScanNode) scanNode;
hiveScanNode.setSelectedPartitions(fileScan.getSelectedPartitions());
if (fileScan.getTableSample().isPresent()) {
@@ -671,18 +672,23 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
throw new RuntimeException("do not support DLA type " +
((HMSExternalTable) table).getDlaType());
}
} else if (table instanceof IcebergExternalTable) {
- scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv);
- } else if (table instanceof PaimonExternalTable) {
- scanNode = new PaimonScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv);
+ scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv,
+ context.getScanContext());
+ } else if (table.getType() == TableIf.TableType.PAIMON_EXTERNAL_TABLE)
{
+ scanNode = new PaimonScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv,
+ context.getScanContext());
} else if (table instanceof TrinoConnectorExternalTable) {
- scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv);
+ scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv,
+ context.getScanContext());
} else if (table instanceof MaxComputeExternalTable) {
scanNode = new MaxComputeScanNode(context.nextPlanNodeId(),
tupleDescriptor,
- fileScan.getSelectedPartitions(), false, sv);
+ fileScan.getSelectedPartitions(), false, sv,
context.getScanContext());
} else if (table instanceof LakeSoulExternalTable) {
- scanNode = new LakeSoulScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv);
+ scanNode = new LakeSoulScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv,
+ context.getScanContext());
} else if (table instanceof RemoteDorisExternalTable) {
- scanNode = new RemoteDorisScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv);
+ scanNode = new RemoteDorisScanNode(context.nextPlanNodeId(),
tupleDescriptor, false, sv,
+ context.getScanContext());
} else {
throw new RuntimeException("do not support table type " +
table.getType());
}
@@ -721,7 +727,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
TableIf table = esScan.getTable();
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table,
context);
EsScanNode esScanNode = new EsScanNode(context.nextPlanNodeId(),
tupleDescriptor,
- table instanceof EsExternalTable);
+ table instanceof EsExternalTable, context.getScanContext());
esScanNode.setNereidsId(esScan.getId());
context.getNereidsIdToPlanNodeIdMap().put(esScan.getId(),
esScanNode.getId());
Utils.execWithUncheckedException(esScanNode::init);
@@ -766,7 +772,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan;
ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(),
tupleDescriptor, false,
hudiScan.getScanParams(), hudiScan.getIncrementalRelation(),
ConnectContext.get().getSessionVariable(),
- directoryLister);
+ directoryLister, context.getScanContext());
if (fileScan.getTableSnapshot().isPresent()) {
((FileQueryScanNode)
scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
}
@@ -807,7 +813,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
TableIf table = jdbcScan.getTable();
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table,
context);
JdbcScanNode jdbcScanNode = new JdbcScanNode(context.nextPlanNodeId(),
tupleDescriptor,
- table instanceof JdbcExternalTable);
+ table instanceof JdbcExternalTable, context.getScanContext());
jdbcScanNode.setNereidsId(jdbcScan.getId());
context.getNereidsIdToPlanNodeIdMap().put(jdbcScan.getId(),
jdbcScanNode.getId());
Utils.execWithUncheckedException(jdbcScanNode::init);
@@ -826,7 +832,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
TableIf table = odbcScan.getTable();
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table,
context);
OdbcScanNode odbcScanNode = new OdbcScanNode(context.nextPlanNodeId(),
tupleDescriptor,
- (OdbcTable) table);
+ (OdbcTable) table, context.getScanContext());
odbcScanNode.setNereidsId(odbcScan.getId());
context.getNereidsIdToPlanNodeIdMap().put(odbcScan.getId(),
odbcScanNode.getId());
Utils.execWithUncheckedException(odbcScanNode::init);
@@ -867,7 +873,8 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
generateTupleDesc(olapScan.getBaseOutputs(), olapTable, context);
}
- OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(),
tupleDescriptor, "OlapScanNode");
+ OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(),
tupleDescriptor, "OlapScanNode",
+ context.getScanContext());
olapScanNode.setNereidsId(olapScan.getId());
context.getNereidsIdToPlanNodeIdMap().put(olapScan.getId(),
olapScanNode.getId());
@@ -1058,12 +1065,12 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
scanNode = new
BackendPartitionedSchemaScanNode(context.nextPlanNodeId(), table,
tupleDescriptor,
schemaScan.getSchemaCatalog().orElse(null),
schemaScan.getSchemaDatabase().orElse(null),
schemaScan.getSchemaTable().orElse(null),
- translateToExprs(schemaScan.getFrontendConjuncts(),
context));
+ translateToExprs(schemaScan.getFrontendConjuncts(),
context), context.getScanContext());
} else {
scanNode = new SchemaScanNode(context.nextPlanNodeId(),
tupleDescriptor,
schemaScan.getSchemaCatalog().orElse(null),
schemaScan.getSchemaDatabase().orElse(null),
schemaScan.getSchemaTable().orElse(null),
translateToExprs(schemaScan.getFrontendConjuncts(),
- context));
+ context), context.getScanContext());
}
scanNode.setNereidsId(schemaScan.getId());
context.getNereidsIdToPlanNodeIdMap().put(schemaScan.getId(),
scanNode.getId());
@@ -1410,7 +1417,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
context.addExprIdSlotRefPair(consumerSlot.getExprId(),
slotRef);
}
}
- CTEScanNode cteScanNode = new CTEScanNode(tupleDescriptor);
+ CTEScanNode cteScanNode = new CTEScanNode(tupleDescriptor,
context.getScanContext());
translateRuntimeFilter(cteConsumer, cteScanNode, context);
context.getCteScanNodeMap().put(multiCastFragment.getFragmentId(),
cteScanNode);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
index dd9486367c2..7420524239d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
@@ -44,6 +44,7 @@ import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
@@ -67,6 +68,7 @@ import javax.annotation.Nullable;
public class PlanTranslatorContext {
private final ConnectContext connectContext;
private final StatementContext statementContext;
+ private final ScanContext scanContext;
private final List<PlanFragment> planFragments = Lists.newArrayList();
private DescriptorTable descTable;
@@ -123,6 +125,11 @@ public class PlanTranslatorContext {
public PlanTranslatorContext(CascadesContext ctx) {
this.connectContext = ctx.getConnectContext();
this.statementContext = ctx.getStatementContext();
+ this.scanContext = connectContext == null ||
connectContext.getSessionVariable() == null
+ ? ScanContext.EMPTY
+ : ScanContext.builder()
+
.clusterName(connectContext.getSessionVariable().resolveCloudClusterName(connectContext))
+ .build();
this.translator = new
RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
this.topnFilterContext = ctx.getTopnFilterContext();
this.runtimeFilterV2Context = ctx.getRuntimeFilterV2Context();
@@ -133,6 +140,11 @@ public class PlanTranslatorContext {
public PlanTranslatorContext(CascadesContext ctx, DescriptorTable
descTable) {
this.connectContext = ctx.getConnectContext();
this.statementContext = ctx.getStatementContext();
+ this.scanContext = connectContext == null ||
connectContext.getSessionVariable() == null
+ ? ScanContext.EMPTY
+ : ScanContext.builder()
+
.clusterName(connectContext.getSessionVariable().resolveCloudClusterName(connectContext))
+ .build();
this.translator = new
RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
this.topnFilterContext = ctx.getTopnFilterContext();
this.runtimeFilterV2Context = ctx.getRuntimeFilterV2Context();
@@ -146,6 +158,7 @@ public class PlanTranslatorContext {
public PlanTranslatorContext() {
this.connectContext = null;
this.statementContext = new StatementContext();
+ this.scanContext = ScanContext.EMPTY;
this.translator = null;
this.topnFilterContext = new TopnFilterContext();
IdGenerator<RuntimeFilterId> runtimeFilterIdGen =
RuntimeFilterId.createGenerator();
@@ -279,6 +292,14 @@ public class PlanTranslatorContext {
physicalRelations.add(physicalRelation);
}
+ public String getClusterName() {
+ return scanContext.getClusterName();
+ }
+
+ public ScanContext getScanContext() {
+ return scanContext;
+ }
+
public List<PhysicalRelation> getPhysicalRelations() {
return physicalRelations;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
index 4998e22ca0f..42999dd3532 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
@@ -36,6 +36,7 @@ import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TBrokerFileStatus;
@@ -195,7 +196,10 @@ public class NereidsLoadingTaskPlanner {
}
// Create a single FileLoadScanNode for all file groups
- FileLoadScanNode fileScanNode = new FileLoadScanNode(new
PlanNodeId(0), loadPlanInfos.get(0).getDestTuple());
+ String clusterName = ConnectContext.get() == null ? ""
+ :
ConnectContext.get().getSessionVariable().resolveCloudClusterName();
+ FileLoadScanNode fileScanNode = new FileLoadScanNode(new
PlanNodeId(0), loadPlanInfos.get(0).getDestTuple(),
+ ScanContext.builder().clusterName(clusterName).build());
fileScanNode.finalizeForNereids(loadId, fileGroupInfos, contexts,
loadPlanInfos);
scanNodes.add(fileScanNode);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
index 4cc7c3384e8..a1c912eede8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
@@ -38,6 +38,7 @@ import org.apache.doris.planner.FileLoadScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.FrontendOptions;
@@ -253,7 +254,10 @@ public class NereidsStreamLoadPlanner {
scanTupleDesc.setTable(destTable);
NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo =
planInfoCollector.collectLoadPlanInfo(streamLoadPlan,
descriptorTable, scanTupleDesc);
- FileLoadScanNode fileScanNode = new FileLoadScanNode(new
PlanNodeId(0), loadPlanInfo.getDestTuple());
+ String clusterName = ConnectContext.get() == null ? ""
+ :
ConnectContext.get().getSessionVariable().resolveCloudClusterName();
+ FileLoadScanNode fileScanNode = new FileLoadScanNode(new
PlanNodeId(0), loadPlanInfo.getDestTuple(),
+ ScanContext.builder().clusterName(clusterName).build());
fileScanNode.finalizeForNereids(loadId,
Lists.newArrayList(fileGroupInfo), Lists.newArrayList(context),
Lists.newArrayList(loadPlanInfo));
scanNode = fileScanNode;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 5e1346c2991..af4baee6043 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -311,9 +311,10 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalP
int prunedPartNum = candidate.getSelectedPartitionIds().size();
int bucketNum =
candidate.getTable().getDefaultDistributionInfo().getBucketNum();
int totalBucketNum = prunedPartNum * bucketNum;
- int backEndNum = Math.max(1,
ConnectContext.get().getEnv().getClusterInfo()
- .getBackendsNumber(true));
- int paraNum = Math.max(1,
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
+ ConnectContext connectContext = ConnectContext.get();
+ int backEndNum = Math.max(1,
connectContext.getEnv().getClusterInfo().getBackendsNumber(true));
+ String clusterName =
connectContext.getSessionVariable().resolveCloudClusterName(connectContext);
+ int paraNum = Math.max(1,
connectContext.getSessionVariable().getParallelExecInstanceNum(clusterName));
return totalBucketNum < backEndNum * paraNum * 0.8;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SaltJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SaltJoin.java
index 1227d63ebc8..dd04b5ceee8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SaltJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SaltJoin.java
@@ -340,7 +340,9 @@ public class SaltJoin extends OneRewriteRuleFactory {
.getSessionVariable().skewRewriteJoinSaltExplodeFactor;
if (factor == 0) {
int beNumber = Math.max(1,
connectContext.getEnv().getClusterInfo().getBackendsNumber(true));
- int parallelInstance = Math.max(1,
connectContext.getSessionVariable().getParallelExecInstanceNum());
+ String clusterName =
connectContext.getSessionVariable().resolveCloudClusterName(connectContext);
+ int parallelInstance = Math.max(1,
+
connectContext.getSessionVariable().getParallelExecInstanceNum(clusterName));
factor = (int) Math.min((long) beNumber * parallelInstance *
SALT_FACTOR, Integer.MAX_VALUE);
}
return Math.max(factor, 1);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
index f7610480efa..635cf1bbac0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
@@ -96,8 +96,9 @@ public class BackendPartitionedSchemaScanNode extends
SchemaScanNode {
private Collection<Long> selectedPartitionIds = Lists.newArrayList();
public BackendPartitionedSchemaScanNode(PlanNodeId id, TableIf table,
TupleDescriptor desc,
- String schemaCatalog, String schemaDatabase, String schemaTable,
List<Expr> frontendConjuncts) {
- super(id, desc, schemaCatalog, schemaDatabase, schemaTable,
frontendConjuncts);
+ String schemaCatalog, String schemaDatabase, String schemaTable,
List<Expr> frontendConjuncts,
+ ScanContext scanContext) {
+ super(id, desc, schemaCatalog, schemaDatabase, schemaTable,
frontendConjuncts, scanContext);
this.tableIf = table;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/CTEScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/CTEScanNode.java
index 1e53a9d1cb5..7f5cf61b4c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/CTEScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/CTEScanNode.java
@@ -32,8 +32,8 @@ import java.util.List;
public class CTEScanNode extends ScanNode {
private static final PlanNodeId UNINITIAL_PLANNODEID = new PlanNodeId(-1);
- public CTEScanNode(TupleDescriptor desc) {
- super(UNINITIAL_PLANNODEID, desc, "CTEScanNode",
StatisticalType.CTE_SCAN_NODE);
+ public CTEScanNode(TupleDescriptor desc, ScanContext scanContext) {
+ super(UNINITIAL_PLANNODEID, desc, "CTEScanNode", scanContext,
StatisticalType.CTE_SCAN_NODE);
}
public void setPlanNodeId(PlanNodeId id) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
index aa873276d62..b754155431d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
@@ -45,8 +45,9 @@ public class DataGenScanNode extends ExternalScanNode {
private DataGenTableValuedFunction tvf;
- public DataGenScanNode(PlanNodeId id, TupleDescriptor desc,
DataGenTableValuedFunction tvf) {
- super(id, desc, "DataGenScanNode",
StatisticalType.TABLE_VALUED_FUNCTION_NODE, false);
+ public DataGenScanNode(PlanNodeId id, TupleDescriptor desc,
DataGenTableValuedFunction tvf,
+ ScanContext scanContext) {
+ super(id, desc, "DataGenScanNode",
StatisticalType.TABLE_VALUED_FUNCTION_NODE, scanContext, false);
this.tvf = tvf;
}
@@ -95,8 +96,9 @@ public class DataGenScanNode extends ExternalScanNode {
// by multi-processes or multi-threads. So we assign instance number to 1.
@Override
public int getNumInstances() {
- if
(ConnectContext.get().getSessionVariable().isIgnoreStorageDataDistribution()) {
- return
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+ ConnectContext context = ConnectContext.get();
+ if (context != null &&
context.getSessionVariable().isIgnoreStorageDataDistribution()) {
+ return
context.getSessionVariable().getParallelExecInstanceNum(scanContext.getClusterName());
}
return 1;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
index c2e873c6613..32d74d76aa0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
@@ -59,8 +59,8 @@ public class FileLoadScanNode extends FileScanNode {
* External file scan node for load from file
* These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
*/
- public FileLoadScanNode(PlanNodeId id, TupleDescriptor desc) {
- super(id, desc, "FILE_LOAD_SCAN_NODE", StatisticalType.FILE_SCAN_NODE,
false);
+ public FileLoadScanNode(PlanNodeId id, TupleDescriptor desc, ScanContext
scanContext) {
+ super(id, desc, "FILE_LOAD_SCAN_NODE", StatisticalType.FILE_SCAN_NODE,
scanContext, false);
}
public void finalizeForNereids(TUniqueId loadId,
List<NereidsFileGroupInfo> fileGroupInfos,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitScanNode.java
index ef95ee61e3c..b2d1c02976c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitScanNode.java
@@ -35,9 +35,8 @@ public class GroupCommitScanNode extends ExternalScanNode {
long tableId;
- public GroupCommitScanNode(PlanNodeId id, TupleDescriptor desc, long
tableId) {
- super(id, desc, "GROUP_COMMIT_SCAN_NODE",
- StatisticalType.GROUP_COMMIT_SCAN_NODE, false);
+ public GroupCommitScanNode(PlanNodeId id, TupleDescriptor desc, long
tableId, ScanContext scanContext) {
+ super(id, desc, "GROUP_COMMIT_SCAN_NODE",
StatisticalType.GROUP_COMMIT_SCAN_NODE, scanContext, false);
this.tableId = tableId;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 24932745b95..38a56350ebd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -212,8 +212,8 @@ public class OlapScanNode extends ScanNode {
private Column globalRowIdColumn;
// Constructs node to scan given data files of table 'tbl'.
- public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName) {
- super(id, desc, planNodeName, StatisticalType.OLAP_SCAN_NODE);
+ public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName, ScanContext scanContext) {
+ super(id, desc, planNodeName, scanContext,
StatisticalType.OLAP_SCAN_NODE);
olapTable = (OlapTable) desc.getTable();
distributionColumnIds = Sets.newTreeSet();
@@ -1091,8 +1091,9 @@ public class OlapScanNode extends ScanNode {
public int getNumInstances() {
// In pipeline exec engine, the instance num equals be_num * parallel
instance.
// so here we need count distinct be_num to do the work. make sure get
right instance
- if
(ConnectContext.get().getSessionVariable().isIgnoreStorageDataDistribution()) {
- return
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+ ConnectContext context = ConnectContext.get();
+ if (context != null &&
context.getSessionVariable().isIgnoreStorageDataDistribution()) {
+ return
context.getSessionVariable().getParallelExecInstanceNum(scanContext.getClusterName());
}
return scanRangeLocations.size();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index be92a932c03..1b4e35d3821 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -228,8 +228,10 @@ public class PlanFragment extends TreeNode<PlanFragment> {
* Assign ParallelExecNum by default value for Asynchronous request
*/
public void setParallelExecNumIfExists() {
- if (ConnectContext.get() != null) {
- parallelExecNum =
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+ ConnectContext context = ConnectContext.get();
+ if (context != null) {
+ String clusterName =
context.getSessionVariable().resolveCloudClusterName(context);
+ parallelExecNum =
context.getSessionVariable().getParallelExecInstanceNum(clusterName);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanContext.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanContext.java
new file mode 100644
index 00000000000..875bbffa8e3
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanContext.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+/**
+ * Shared context for scan planning/runtime decisions.
+ * <p>
+ * Keep this object immutable so scan nodes can safely cache it and
+ * we can evolve fields incrementally in future.
+ */
+public final class ScanContext {
+ public static final ScanContext EMPTY = new ScanContext("");
+
+ private final String clusterName;
+
+ private ScanContext(String clusterName) {
+ this.clusterName = clusterName == null ? "" : clusterName;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public static final class Builder {
+ private String clusterName = "";
+
+ public Builder clusterName(String clusterName) {
+ this.clusterName = clusterName;
+ return this;
+ }
+
+ public ScanContext build() {
+ if (clusterName == null || clusterName.isEmpty()) {
+ return ScanContext.EMPTY;
+ }
+ return new ScanContext(clusterName);
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index bb7eca6accc..cd26b28bcbd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -74,6 +74,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -112,10 +113,18 @@ public abstract class ScanNode extends PlanNode
implements SplitGenerator {
// This is also important for local shuffle logic.
// Now only OlapScanNode and FileQueryScanNode implement this.
protected HashSet<Long> scanBackendIds = new HashSet<>();
+ // Immutable scan context used for evolving scan-related metadata.
+ protected final ScanContext scanContext;
- public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
StatisticalType statisticalType) {
+ public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
ScanContext scanContext) {
+ this(id, desc, planNodeName, scanContext, StatisticalType.DEFAULT);
+ }
+
+ public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
ScanContext scanContext,
+ StatisticalType statisticalType) {
super(id, desc.getId().asList(), planNodeName, statisticalType);
this.desc = desc;
+ this.scanContext = Objects.requireNonNull(scanContext, "scanContext
can not be null");
}
protected List<Column> getColumns() {
@@ -702,11 +711,21 @@ public abstract class ScanNode extends PlanNode
implements SplitGenerator {
return selectedSplitNum;
}
+ public ScanContext getScanContext() {
+ return scanContext;
+ }
+
@Override
public boolean isSerialOperator() {
- return numScanBackends() <= 0 || getScanRangeNum()
- <
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() *
numScanBackends()
- || (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable().isForceToLocalShuffle());
+ ConnectContext context = ConnectContext.get();
+ if (context == null) {
+ return numScanBackends() <= 0;
+ }
+ int parallelExecInstanceNum = context.getSessionVariable()
+ .getParallelExecInstanceNum(scanContext.getClusterName());
+ return numScanBackends() <= 0
+ || getScanRangeNum() < parallelExecInstanceNum *
numScanBackends()
+ || context.getSessionVariable().isForceToLocalShuffle();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
index 52932fe65d1..9b6a54a1957 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
@@ -66,8 +66,9 @@ public class SchemaScanNode extends ScanNode {
* Constructs node to scan given data files of table 'tbl'.
*/
public SchemaScanNode(PlanNodeId id, TupleDescriptor desc,
- String schemaCatalog, String schemaDb, String schemaTable,
List<Expr> frontendConjuncts) {
- super(id, desc, "SCAN SCHEMA", StatisticalType.SCHEMA_SCAN_NODE);
+ String schemaCatalog, String schemaDb, String schemaTable,
List<Expr> frontendConjuncts,
+ ScanContext scanContext) {
+ super(id, desc, "SCAN SCHEMA", scanContext,
StatisticalType.SCHEMA_SCAN_NODE);
this.tableName = desc.getTable().getName();
this.schemaCatalog = schemaCatalog;
this.schemaDb = schemaDb;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 81c6e40eb73..34d1fce5a70 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.VariableAnnotation;
@@ -3980,7 +3981,7 @@ public class SessionVariable implements Serializable,
Writable {
this.debugSkipFoldConstant = debugSkipFoldConstant;
}
- public int getParallelExecInstanceNum() {
+ public int getParallelExecInstanceNum(String clusterName) {
ConnectContext connectContext = ConnectContext.get();
if (connectContext != null && connectContext.getEnv() != null &&
connectContext.getEnv().getAuth() != null) {
int userParallelExecInstanceNum = connectContext.getEnv().getAuth()
@@ -3989,8 +3990,12 @@ public class SessionVariable implements Serializable,
Writable {
return userParallelExecInstanceNum;
}
}
+ String resolvedClusterName = clusterName;
+ if (Config.isCloudMode() &&
Strings.isNullOrEmpty(resolvedClusterName)) {
+ resolvedClusterName = resolveCloudClusterName(connectContext);
+ }
if (parallelPipelineTaskNum == 0) {
- int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize();
+ int size =
Env.getCurrentSystemInfo().getMinPipelineExecutorSize(resolvedClusterName);
int autoInstance = (size + 1) / 2;
return Math.min(autoInstance, maxInstanceNum);
} else {
@@ -3998,6 +4003,31 @@ public class SessionVariable implements Serializable,
Writable {
}
}
+ public String resolveCloudClusterName() {
+ return resolveCloudClusterName(ConnectContext.get());
+ }
+
+ public String resolveCloudClusterName(ConnectContext connectContext) {
+ if (!Config.isCloudMode()) {
+ return "";
+ }
+ if (!Strings.isNullOrEmpty(cloudCluster)) {
+ return cloudCluster;
+ }
+ if (connectContext == null) {
+ return "";
+ }
+ try {
+ String clusterName = connectContext.getCloudCluster(false);
+ return clusterName == null ? "" : clusterName;
+ } catch (ComputeGroupException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("failed to resolve cloud cluster for parallel
instance num", e);
+ }
+ return "";
+ }
+ }
+
public boolean getEnablePreferCachedRowset() {
ConnectContext connectContext = ConnectContext.get();
if (connectContext != null && connectContext.getEnv() != null &&
connectContext.getEnv().getAuth() != null) {
@@ -4912,7 +4942,8 @@ public class SessionVariable implements Serializable,
Writable {
}
tResult.setBeExecVersion(Config.be_exec_version);
tResult.setEnableLocalShuffle(enableLocalShuffle);
- tResult.setParallelInstance(getParallelExecInstanceNum());
+ String clusterName = resolveCloudClusterName();
+ tResult.setParallelInstance(getParallelExecInstanceNum(clusterName));
tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary);
tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery);
tResult.setEnableShareHashTableForBroadcastJoin(enableShareHashTableForBroadcastJoin);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 71646ac6bc7..2c8d095c45f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -336,7 +336,9 @@ public class StmtExecutor {
builder.instancesNumPerBe(
beToInstancesNum.entrySet().stream().map(entry ->
entry.getKey() + ":" + entry.getValue())
.collect(Collectors.joining(",")));
-
builder.parallelFragmentExecInstance(String.valueOf(context.sessionVariable.getParallelExecInstanceNum()));
+ String clusterName =
context.sessionVariable.resolveCloudClusterName(context);
+ builder.parallelFragmentExecInstance(
+
String.valueOf(context.sessionVariable.getParallelExecInstanceNum(clusterName)));
builder.traceId(context.getSessionVariable().getTraceId());
builder.isNereids(context.getState().isNereids() ? "Yes" : "No");
return builder.build();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 59264ee6bb6..df2bac3786f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -1128,7 +1128,9 @@ public class SystemInfoService {
return idToBackendRef;
}
- public int getMinPipelineExecutorSize() {
+ // CloudSystemInfoService override.
+ // Non-cloud ignores clusterName and calculates from all backends.
+ public int getMinPipelineExecutorSize(String clusterName) {
List<Backend> currentBackends = null;
try {
currentBackends = getAllBackendsByAllCluster().values().asList();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
index 66f344f03be..c4be29b1617 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.DataGenScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TDataGenFunctionName;
@@ -34,6 +35,7 @@ public abstract class DataGenTableValuedFunction extends
TableValuedFunctionIf {
@Override
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
- return new DataGenScanNode(id, desc, this);
+ return new DataGenScanNode(id, desc, this,
+
ScanContext.builder().clusterName(sv.resolveCloudClusterName()).build());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index e543574a561..e08eaa2c825 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -52,6 +52,7 @@ import org.apache.doris.datasource.tvf.source.TVFScanNode;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PFetchTableSchemaRequest;
@@ -240,7 +241,8 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
@Override
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
- return new TVFScanNode(id, desc, false, sv);
+ return new TVFScanNode(id, desc, false, sv,
+
ScanContext.builder().clusterName(sv.resolveCloudClusterName()).build());
}
@Override
@@ -540,4 +542,3 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
}
}
}
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
index 111c3b9370d..074308bea32 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
@@ -29,6 +29,7 @@ import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.GroupCommitScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
@@ -91,7 +92,8 @@ public class GroupCommitTableValuedFunction extends
ExternalFileTableValuedFunct
@Override
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
- return new GroupCommitScanNode(id, desc, tableId);
+ return new GroupCommitScanNode(id, desc, tableId,
+
ScanContext.builder().clusterName(sv.resolveCloudClusterName()).build());
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
index 040d47c7b41..e39857e4588 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.SessionVariable;
@@ -54,6 +55,7 @@ public class JdbcQueryTableValueFunction extends
QueryTableValueFunction {
desc.getTable().getFullSchema(), TableType.JDBC);
catalog.configureJdbcTable(jdbcTable, desc.getTable().getName());
desc.setTable(jdbcTable);
- return new JdbcScanNode(id, desc, true, query);
+ return new JdbcScanNode(id, desc, true, query,
+
ScanContext.builder().clusterName(sv.resolveCloudClusterName()).build());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
index 32be139657c..39fde6a5615 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.tvf.source.MetadataScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TMetaScanRange;
@@ -64,6 +65,7 @@ public abstract class MetadataTableValuedFunction extends
TableValuedFunctionIf
@Override
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
- return new MetadataScanNode(id, desc, this);
+ return new MetadataScanNode(id, desc, this,
+
ScanContext.builder().clusterName(sv.resolveCloudClusterName()).build());
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
index 85d14585f21..85bd4677a0c 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
@@ -369,7 +369,7 @@ public class CloudSystemInfoServiceTest {
try {
// Since there are no backends in the cluster, should return 1
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(clusterName);
Assert.assertEquals(1, result);
} finally {
ConnectContext.remove();
@@ -403,7 +403,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return the pipeline executor size of the single backend
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(clusterName);
Assert.assertEquals(8, result);
} finally {
ConnectContext.remove();
@@ -454,7 +454,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return the minimum pipeline executor size (6)
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(clusterName);
Assert.assertEquals(6, result);
} finally {
ConnectContext.remove();
@@ -505,7 +505,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return the minimum positive pipeline executor size (4)
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(clusterName);
Assert.assertEquals(4, result);
} finally {
ConnectContext.remove();
@@ -549,7 +549,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return 1 when no valid pipeline executor sizes are
// found
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(clusterName);
Assert.assertEquals(1, result);
} finally {
ConnectContext.remove();
@@ -565,7 +565,7 @@ public class CloudSystemInfoServiceTest {
createTestConnectContext(null);
try {
// Should return 1 when no cluster is set in ConnectContext
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize("");
Assert.assertEquals(1, result);
} finally {
ConnectContext.remove();
@@ -628,7 +628,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return 8 (minimum valid size)
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(clusterName);
Assert.assertEquals(8, result);
} finally {
ConnectContext.remove();
@@ -679,7 +679,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return 512 (minimum among large values)
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(clusterName);
Assert.assertEquals(512, result);
} finally {
ConnectContext.remove();
@@ -715,7 +715,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return 32 (consistent across all backends)
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(clusterName);
Assert.assertEquals(32, result);
} finally {
ConnectContext.remove();
@@ -786,7 +786,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return 8 (minimum from current cluster2), not 2 (global
minimum from cluster1)
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(cluster2Name);
Assert.assertEquals(8, result);
} finally {
ConnectContext.remove();
@@ -860,14 +860,14 @@ public class CloudSystemInfoServiceTest {
try {
// Should return 32 (minimum from virtual cluster's physical
cluster), not 8
// (from other cluster)
- int result = infoService.getMinPipelineExecutorSize();
+ int result =
infoService.getMinPipelineExecutorSize(virtualClusterName);
Assert.assertEquals(32, result);
// Switch to other cluster
ctx.setCloudCluster(otherClusterName);
// Should return 8 (from other cluster)
- result = infoService.getMinPipelineExecutorSize();
+ result = infoService.getMinPipelineExecutorSize(otherClusterName);
Assert.assertEquals(8, result);
} finally {
@@ -885,7 +885,7 @@ public class CloudSystemInfoServiceTest {
try {
// Should return 1 because no cluster is set (will catch
AnalysisException)
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize("");
Assert.assertEquals(1, result);
} finally {
@@ -958,14 +958,14 @@ public class CloudSystemInfoServiceTest {
try {
// Should return 2 (minimum from cluster1), not 16 (minimum from
cluster2)
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize(cluster1Name);
Assert.assertEquals(2, result);
// Now switch to cluster2
ctx.setCloudCluster(cluster2Name);
// Should return 16 (minimum from cluster2), not 2 (minimum from
cluster1)
- result = infoService.getMinPipelineExecutorSize();
+ result = infoService.getMinPipelineExecutorSize(cluster2Name);
Assert.assertEquals(16, result);
} finally {
// Clean up ConnectContext
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
index 8b1d98e509a..aee814907bc 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
@@ -22,8 +22,8 @@ import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
-import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileFormatType;
import org.junit.Assert;
@@ -38,8 +38,7 @@ public class FileQueryScanNodeTest {
private static class TestFileQueryScanNode extends FileQueryScanNode {
TestFileQueryScanNode(SessionVariable sv) {
- super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)),
"test",
- StatisticalType.TEST_EXTERNAL_TABLE, false, sv);
+ super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)),
"test", ScanContext.EMPTY, false, sv);
}
@Override
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
index 727ff939003..ba7687157ea 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
@@ -23,6 +23,7 @@ import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.junit.Assert;
@@ -46,7 +47,7 @@ public class HiveScanNodeTest {
Mockito.when(table.getCatalog()).thenReturn(catalog);
Mockito.when(catalog.bindBrokerName()).thenReturn("");
desc.setTable(table);
- HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false,
sv, null);
+ HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false,
sv, null, ScanContext.EMPTY);
HiveMetaStoreCache.FileCacheValue fileCacheValue = new
HiveMetaStoreCache.FileCacheValue();
HiveMetaStoreCache.HiveFileStatus status = new
HiveMetaStoreCache.HiveFileStatus();
@@ -71,7 +72,7 @@ public class HiveScanNodeTest {
Mockito.when(table.getCatalog()).thenReturn(catalog);
Mockito.when(catalog.bindBrokerName()).thenReturn("");
desc.setTable(table);
- HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false,
sv, null);
+ HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false,
sv, null, ScanContext.EMPTY);
HiveMetaStoreCache.FileCacheValue fileCacheValue = new
HiveMetaStoreCache.FileCacheValue();
HiveMetaStoreCache.HiveFileStatus status = new
HiveMetaStoreCache.HiveFileStatus();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
index 48031a2303e..e16ee803dca 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.iceberg.source;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.iceberg.DataFile;
@@ -37,7 +38,7 @@ public class IcebergScanNodeTest {
private static class TestIcebergScanNode extends IcebergScanNode {
TestIcebergScanNode(SessionVariable sv) {
- super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)), sv);
+ super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)), sv,
ScanContext.EMPTY);
}
@Override
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
index 09795c53910..454f2d42d9e 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -25,6 +25,7 @@ import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.FileSplitter;
import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.paimon.data.BinaryRow;
@@ -60,7 +61,7 @@ public class PaimonScanNodeTest {
public void testSplitWeight() throws UserException {
TupleDescriptor desc = new TupleDescriptor(new TupleId(3));
- PaimonScanNode paimonScanNode = new PaimonScanNode(new PlanNodeId(1),
desc, false, sv);
+ PaimonScanNode paimonScanNode = new PaimonScanNode(new PlanNodeId(1),
desc, false, sv, ScanContext.EMPTY);
paimonScanNode.setSource(new PaimonSource());
@@ -387,7 +388,8 @@ public class PaimonScanNodeTest {
public void testDetermineTargetFileSplitSizeHonorsMaxFileSplitNum() throws
Exception {
SessionVariable sv = new SessionVariable();
sv.setMaxFileSplitNum(100);
- PaimonScanNode node = new PaimonScanNode(new PlanNodeId(0), new
TupleDescriptor(new TupleId(0)), false, sv);
+ PaimonScanNode node = new PaimonScanNode(new PlanNodeId(0), new
TupleDescriptor(new TupleId(0)),
+ false, sv, ScanContext.EMPTY);
PaimonSource source = Mockito.mock(PaimonSource.class);
Mockito.when(source.getFileFormatFromTableProperties()).thenReturn("parquet");
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/MetadataScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/MetadataScanNodeTest.java
index a3df5dc8a04..9700c306f51 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/MetadataScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/MetadataScanNodeTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.tvf.source;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.system.Backend;
import org.apache.doris.tablefunction.MetadataTableValuedFunction;
import org.apache.doris.thrift.TMetaScanRange;
@@ -63,7 +64,7 @@ public class MetadataScanNodeTest {
*/
@Test
public void testInitedScanRangeLocationsInitialState() throws Exception {
- MetadataScanNode scanNode = new MetadataScanNode(planNodeId,
tupleDescriptor, mockTvf);
+ MetadataScanNode scanNode = new MetadataScanNode(planNodeId,
tupleDescriptor, mockTvf, ScanContext.EMPTY);
// Use reflection to access the private field
Field field =
MetadataScanNode.class.getDeclaredField("initedScanRangeLocations");
@@ -90,7 +91,7 @@ public class MetadataScanNodeTest {
Mockito.when(mockTvf.getMetaScanRange(Mockito.anyList())).thenReturn(metaScanRange);
- MetadataScanNode scanNode = new MetadataScanNode(planNodeId,
tupleDescriptor, mockTvf);
+ MetadataScanNode scanNode = new MetadataScanNode(planNodeId,
tupleDescriptor, mockTvf, ScanContext.EMPTY);
// Mock the backend policy using reflection
mockBackendPolicy(scanNode);
@@ -128,7 +129,7 @@ public class MetadataScanNodeTest {
Mockito.when(mockTvf.getMetaScanRange(Mockito.anyList())).thenReturn(metaScanRange);
- MetadataScanNode scanNode = new MetadataScanNode(planNodeId,
tupleDescriptor, mockTvf);
+ MetadataScanNode scanNode = new MetadataScanNode(planNodeId,
tupleDescriptor, mockTvf, ScanContext.EMPTY);
mockBackendPolicy(scanNode);
// Call getScanRangeLocations multiple times
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
index 8d591362376..8cf98daea94 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.FunctionGenTable;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
import org.apache.doris.thrift.TBrokerFileStatus;
@@ -45,7 +46,7 @@ public class TVFScanNodeTest {
ExternalFileTableValuedFunction tvf =
Mockito.mock(ExternalFileTableValuedFunction.class);
Mockito.when(table.getTvf()).thenReturn(tvf);
desc.setTable(table);
- TVFScanNode node = new TVFScanNode(new PlanNodeId(0), desc, false, sv);
+ TVFScanNode node = new TVFScanNode(new PlanNodeId(0), desc, false, sv,
ScanContext.EMPTY);
TBrokerFileStatus status = new TBrokerFileStatus();
status.setSize(10_000L * MB);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
index 2e69f7e31f9..79618651714 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
@@ -42,6 +42,7 @@ import
org.apache.doris.nereids.trees.plans.commands.CreateCatalogCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.qe.cache.SqlCache;
@@ -227,7 +228,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase
{
TupleDescriptor desc = new TupleDescriptor(new TupleId(1));
desc.setTable(mgr.getInternalCatalog().getDbNullable("test").getTableNullable("tbl1"));
- olapScanNode = new OlapScanNode(new PlanNodeId(1), desc,
"tb1ScanNode");
+ olapScanNode = new OlapScanNode(new PlanNodeId(1), desc,
"tb1ScanNode", ScanContext.EMPTY);
}
@Test
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
index f499caa96f6..e897ddc0f15 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
@@ -56,6 +56,7 @@ import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.cache.Cache;
@@ -378,7 +379,7 @@ public class OlapQueryCacheTest {
OlapTable table = createProfileTable();
TupleDescriptor desc = new TupleDescriptor(new TupleId(20004));
desc.setTable(table);
- OlapScanNode node = new OlapScanNode(new PlanNodeId(20008), desc,
"userprofilenode");
+ OlapScanNode node = new OlapScanNode(new PlanNodeId(20008), desc,
"userprofilenode", ScanContext.EMPTY);
node.setSelectedPartitionIds(selectedPartitionIds);
return node;
}
@@ -491,7 +492,7 @@ public class OlapQueryCacheTest {
OlapTable table = createEventTable();
TupleDescriptor desc = new TupleDescriptor(new TupleId(30002));
desc.setTable(table);
- OlapScanNode node = new OlapScanNode(new PlanNodeId(30004), desc,
"appeventnode");
+ OlapScanNode node = new OlapScanNode(new PlanNodeId(30004), desc,
"appeventnode", ScanContext.EMPTY);
node.setSelectedPartitionIds(selectedPartitionIds);
return node;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
index 0ad09096626..033568017d9 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -447,7 +447,7 @@ public class SystemInfoServiceTest {
@Test
public void testGetMinPipelineExecutorSize() {
// Test case 1: No backends
- int result = infoService.getMinPipelineExecutorSize();
+ int result = infoService.getMinPipelineExecutorSize("");
Assert.assertEquals(1, result);
// Test case 2: Single backend with pipeline executor size = 8
@@ -456,7 +456,7 @@ public class SystemInfoServiceTest {
be1.setPipelineExecutorSize(8);
be1.setAlive(true);
- result = infoService.getMinPipelineExecutorSize();
+ result = infoService.getMinPipelineExecutorSize("");
Assert.assertEquals(8, result);
// Test case 3: Multiple backends with different pipeline executor
sizes
@@ -470,7 +470,7 @@ public class SystemInfoServiceTest {
be3.setPipelineExecutorSize(12);
be3.setAlive(true);
- result = infoService.getMinPipelineExecutorSize();
+ result = infoService.getMinPipelineExecutorSize("");
Assert.assertEquals(4, result);
// Test case 4: Backends with zero and negative pipeline executor
sizes (should
@@ -485,7 +485,7 @@ public class SystemInfoServiceTest {
be5.setPipelineExecutorSize(-1); // Should be ignored
be5.setAlive(true);
- result = infoService.getMinPipelineExecutorSize();
+ result = infoService.getMinPipelineExecutorSize("");
Assert.assertEquals(4, result); // Still should be 4 from be2
// Test case 5: All backends have zero or negative pipeline executor
sizes
@@ -493,7 +493,7 @@ public class SystemInfoServiceTest {
be2.setPipelineExecutorSize(-5);
be3.setPipelineExecutorSize(0);
- result = infoService.getMinPipelineExecutorSize();
+ result = infoService.getMinPipelineExecutorSize("");
Assert.assertEquals(1, result); // Should return default value 1
// Test case 6: Mix of positive and non-positive values
@@ -501,7 +501,7 @@ public class SystemInfoServiceTest {
be2.setPipelineExecutorSize(0); // ignored
be3.setPipelineExecutorSize(6); // This should be the minimum
- result = infoService.getMinPipelineExecutorSize();
+ result = infoService.getMinPipelineExecutorSize("");
Assert.assertEquals(6, result);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]