This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 40767003c62 [Fix](ScanNode) Move the finalize phase of ScanNode to
after the end of the Physical Translate phase (#38604)
40767003c62 is described below
commit 40767003c627d4adf9d3b7e059ca9a1ee722015e
Author: Tiewei Fang <[email protected]>
AuthorDate: Mon Aug 5 08:58:59 2024 +0800
[Fix](ScanNode) Move the finalize phase of ScanNode to after the end of the
Physical Translate phase (#38604)
bp: #37565
Currently, Doris first obtains splits and then performs projection.
After column pruning, it calls `updateRequiredSlots` to update the
scanRange information. However, the Trino connector's column pruning
pushdown needs to be completed before obtaining splits.
Therefore, we move the finalize phase of `ScanNode` to after the end of
the `Physical Translate` phase, so that `createScanRangeLocations` can
use the final columns which have been pruning.
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
.../apache/doris/datasource/FileQueryScanNode.java | 13 --------
.../doris/datasource/jdbc/source/JdbcScanNode.java | 9 ------
.../doris/datasource/odbc/source/OdbcScanNode.java | 9 ------
.../datasource/paimon/source/PaimonScanNode.java | 19 ------------
.../glue/translator/PhysicalPlanTranslator.java | 36 ++++++++--------------
.../org/apache/doris/planner/OlapScanNode.java | 6 +++-
.../java/org/apache/doris/planner/ScanNode.java | 10 ------
7 files changed, 17 insertions(+), 85 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 9822855aa72..517ba8be5f8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -19,7 +19,6 @@ package org.apache.doris.datasource;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.SlotDescriptor;
-import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
@@ -40,7 +39,6 @@ import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hive.source.HiveSplit;
import org.apache.doris.datasource.iceberg.source.IcebergSplit;
-import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
@@ -80,7 +78,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.Set;
/**
* FileQueryScanNode for querying the file access type of catalog, now only
support
@@ -182,16 +179,6 @@ public abstract class FileQueryScanNode extends
FileScanNode {
params.setSrcTupleId(-1);
}
- /**
- * Reset required_slots in contexts. This is called after Nereids planner
do the projection.
- * In the projection process, some slots may be removed. So call this to
update the slots info.
- */
- @Override
- public void updateRequiredSlots(PlanTranslatorContext
planTranslatorContext,
- Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
- updateRequiredSlots();
- }
-
private void updateRequiredSlots() throws UserException {
params.unsetRequiredSlots();
for (SlotDescriptor slot : desc.getSlots()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
index 0d292100fe0..a85dd4aaafb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
@@ -27,7 +27,6 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.SlotDescriptor;
-import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
@@ -39,7 +38,6 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
-import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
@@ -59,7 +57,6 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
public class JdbcScanNode extends ExternalScanNode {
private static final Logger LOG = LogManager.getLogger(JdbcScanNode.class);
@@ -252,12 +249,6 @@ public class JdbcScanNode extends ExternalScanNode {
createScanRangeLocations();
}
- @Override
- public void updateRequiredSlots(PlanTranslatorContext context, Set<SlotId>
requiredByProjectSlotIdSet)
- throws UserException {
- createJdbcColumns();
- }
-
@Override
protected void createScanRangeLocations() throws UserException {
scanRangeLocations =
Lists.newArrayList(createSingleScanRangeLocations(backendPolicy));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
index 9b597dddb54..2f9aa4a8334 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
@@ -22,7 +22,6 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.SlotDescriptor;
-import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
@@ -33,7 +32,6 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
-import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
@@ -53,7 +51,6 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
/**
* Full scan of an ODBC table.
@@ -117,12 +114,6 @@ public class OdbcScanNode extends ExternalScanNode {
createScanRangeLocations();
}
- @Override
- public void updateRequiredSlots(PlanTranslatorContext context, Set<SlotId>
requiredByProjectSlotIdSet)
- throws UserException {
- createOdbcColumns();
- }
-
@Override
protected void createScanRangeLocations() throws UserException {
scanRangeLocations =
Lists.newArrayList(createSingleScanRangeLocations(backendPolicy));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index c248bd0635a..6eb1545817a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -17,7 +17,6 @@
package org.apache.doris.datasource.paimon.source;
-import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
@@ -29,7 +28,6 @@ import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
-import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
@@ -40,7 +38,6 @@ import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TPaimonDeletionFileDesc;
import org.apache.doris.thrift.TPaimonFileDesc;
-import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TTableFormatFileDesc;
import com.google.common.base.Preconditions;
@@ -289,22 +286,6 @@ public class PaimonScanNode extends FileQueryScanNode {
}
}
- //When calling 'setPaimonParams' and 'getSplits', the column trimming has
not been performed yet,
- // Therefore, paimon_column_names is temporarily reset here
- @Override
- public void updateRequiredSlots(PlanTranslatorContext
planTranslatorContext,
- Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
- super.updateRequiredSlots(planTranslatorContext,
requiredByProjectSlotIdSet);
- String cols = desc.getSlots().stream().map(slot ->
slot.getColumn().getName())
- .collect(Collectors.joining(","));
- for (TScanRangeLocations tScanRangeLocations : scanRangeLocations) {
- List<TFileRangeDesc> ranges =
tScanRangeLocations.scan_range.ext_scan_range.file_scan_range.ranges;
- for (TFileRangeDesc tFileRangeDesc : ranges) {
-
tFileRangeDesc.table_format_params.paimon_params.setPaimonColumnNames(cols);
- }
- }
- }
-
@Override
public TFileType getLocationType() throws DdlException,
MetaNotFoundException {
return getLocationType(((FileStoreTable)
source.getPaimonTable()).location().toString());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 65289ab5201..7247c9d9291 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -46,8 +46,6 @@ import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
-import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.es.source.EsScanNode;
@@ -274,6 +272,9 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
throw new AnalysisException("tables with unknown column stats:
" + builder);
}
}
+ for (ScanNode scanNode : context.getScanNodes()) {
+ Utils.execWithUncheckedException(scanNode::finalizeForNereids);
+ }
return rootFragment;
}
@@ -635,7 +636,6 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
expr ->
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, esScanNode, context)
)
);
- Utils.execWithUncheckedException(esScanNode::finalizeForNereids);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(),
esScanNode, dataPartition);
context.addPlanFragment(planFragment);
@@ -687,7 +687,6 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
expr ->
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode,
context)
)
);
- Utils.execWithUncheckedException(scanNode::finalizeForNereids);
// Create PlanFragment
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = createPlanFragment(scanNode,
dataPartition, fileScan);
@@ -712,7 +711,6 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
expr ->
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, jdbcScanNode, context)
)
);
- Utils.execWithUncheckedException(jdbcScanNode::finalizeForNereids);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(),
jdbcScanNode, dataPartition);
context.addPlanFragment(planFragment);
@@ -736,7 +734,6 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
expr ->
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, odbcScanNode, context)
)
);
- Utils.execWithUncheckedException(odbcScanNode::finalizeForNereids);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(),
odbcScanNode, dataPartition);
context.addPlanFragment(planFragment);
@@ -817,8 +814,6 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
olapScanNode.setUseTopnOpt(true);
context.getTopnFilterContext().addLegacyTarget(olapScan,
olapScanNode);
}
- // TODO: we need to remove all finalizeForNereids
- olapScanNode.finalizeForNereids();
// Create PlanFragment
// TODO: use a util function to convert distribution to DataPartition
DataPartition dataPartition = DataPartition.RANDOM;
@@ -908,7 +903,6 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
.translateRuntimeFilterTarget(expr,
finalScanNode, context)
)
);
- Utils.execWithUncheckedException(scanNode::finalizeForNereids);
context.addScanNode(scanNode);
PlanFragment planFragment = createPlanFragment(scanNode,
DataPartition.RANDOM, schemaScan);
context.addPlanFragment(planFragment);
@@ -930,7 +924,6 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
.forEach(expr ->
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context)
)
);
- Utils.execWithUncheckedException(scanNode::finalizeForNereids);
context.addScanNode(scanNode);
// TODO: it is weird update label in this way
@@ -1976,6 +1969,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
}
requiredSlotIdSet.add(lastSlot.getId());
}
+ ((OlapScanNode) inputPlanNode).updateRequiredSlots(context,
requiredByProjectSlotIdSet);
}
updateScanSlotsMaterialization((ScanNode) inputPlanNode,
requiredSlotIdSet,
requiredByProjectSlotIdSet, slotIdsByOrder, context);
@@ -2443,22 +2437,16 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
if (scanNode.getTupleDesc().getSlots().isEmpty()) {
scanNode.getTupleDesc().getSlots().add(smallest);
}
- try {
- if (context.getSessionVariable() != null
- && context.getSessionVariable().forbidUnknownColStats
- &&
!StatisticConstants.isSystemTable(scanNode.getTupleDesc().getTable())) {
- for (SlotId slotId : requiredByProjectSlotIdSet) {
- if (context.isColumnStatsUnknown(scanNode, slotId)) {
- String colName =
scanNode.getTupleDesc().getSlot(slotId.asInt()).getColumn().getName();
- throw new AnalysisException("meet unknown column
stats: " + colName);
- }
+ if (context.getSessionVariable() != null
+ && context.getSessionVariable().forbidUnknownColStats
+ &&
!StatisticConstants.isSystemTable(scanNode.getTupleDesc().getTable())) {
+ for (SlotId slotId : requiredByProjectSlotIdSet) {
+ if (context.isColumnStatsUnknown(scanNode, slotId)) {
+ String colName =
scanNode.getTupleDesc().getSlot(slotId.asInt()).getColumn().getName();
+ throw new AnalysisException("meet unknown column stats: "
+ colName);
}
- context.removeScanFromStatsUnknownColumnsMap(scanNode);
}
- scanNode.updateRequiredSlots(context, requiredByProjectSlotIdSet);
- } catch (UserException e) {
- Util.logAndThrowRuntimeException(LOG,
- "User Exception while reset external file scan node
contexts.", e);
+ context.removeScanFromStatsUnknownColumnsMap(scanNode);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 52104f6e668..09496d02578 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -1720,7 +1720,11 @@ public class OlapScanNode extends ScanNode {
: Sets.newTreeSet();
}
- @Override
+ /**
+ * Update required_slots in scan node contexts. This is called after
Nereids planner do the projection.
+ * In the projection process, some slots may be removed. So call this to
update the slots info.
+ * Currently, it is only used by ExternalFileScanNode, add the interface
here to keep the Nereids code clean.
+ */
public void updateRequiredSlots(PlanTranslatorContext context,
Set<SlotId> requiredByProjectSlotIdSet) {
outputColumnUniqueIds.clear();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index c2158b4a0d4..8400d1d0c27 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -48,7 +48,6 @@ import org.apache.doris.datasource.FileScanNode;
import org.apache.doris.datasource.SplitAssignment;
import org.apache.doris.datasource.SplitGenerator;
import org.apache.doris.datasource.SplitSource;
-import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.query.StatsDelta;
@@ -169,15 +168,6 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
return false;
}
- /**
- * Update required_slots in scan node contexts. This is called after
Nereids planner do the projection.
- * In the projection process, some slots may be removed. So call this to
update the slots info.
- * Currently, it is only used by ExternalFileScanNode, add the interface
here to keep the Nereids code clean.
- */
- public void updateRequiredSlots(PlanTranslatorContext context,
- Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
- }
-
private void computeColumnFilter(Column column, SlotDescriptor slotDesc,
PartitionInfo partitionsInfo) {
// Set `columnFilters` all the time because `DistributionPruner` also
use this.
// Maybe we could use `columnNameToRange` for `DistributionPruner` and
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]