This is an automated email from the ASF dual-hosted git repository.
starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7fc42a5fa4f [fix](Nereids) handle continuous filter or project in plan
(#40176)
7fc42a5fa4f is described below
commit 7fc42a5fa4fd9bcc2259fde6e0d1ba9c49efec6f
Author: morrySnow <[email protected]>
AuthorDate: Tue Sep 10 15:46:59 2024 +0800
[fix](Nereids) handle continuous filter or project in plan (#40176)
if we meet continuous project or filter in translator, we try to
generate SelectNode as far as possible to avoid generate invalid plan
for example
```
Filter(conjuncts 1)
+-- Limit (limit 10)
+-- Filter(conjuncts 2)
+-- Aggregate
```
will be translated to
```
SELECT_NODE (conjuncts 1)
+-- AGGREGATE_NODE (conjuncts 2) (limit 10)
```
---
.../glue/translator/PhysicalPlanTranslator.java | 44 ++++++++++++++++------
1 file changed, 32 insertions(+), 12 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 2a34bc3ca91..28456041f7d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -720,7 +720,6 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
JdbcScanNode jdbcScanNode = new JdbcScanNode(context.nextPlanNodeId(),
tupleDescriptor,
table instanceof JdbcExternalTable);
jdbcScanNode.setNereidsId(jdbcScan.getId());
-
jdbcScanNode.addConjuncts(translateToLegacyConjuncts(jdbcScan.getConjuncts()));
Utils.execWithUncheckedException(jdbcScanNode::init);
context.addScanNode(jdbcScanNode, jdbcScan);
context.getRuntimeTranslator().ifPresent(
@@ -744,7 +743,6 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
OdbcScanNode odbcScanNode = new OdbcScanNode(context.nextPlanNodeId(),
tupleDescriptor,
(OdbcTable) table);
odbcScanNode.setNereidsId(odbcScan.getId());
-
odbcScanNode.addConjuncts(translateToLegacyConjuncts(odbcScan.getConjuncts()));
Utils.execWithUncheckedException(odbcScanNode::init);
context.addScanNode(odbcScanNode, odbcScan);
context.getRuntimeTranslator().ifPresent(
@@ -1258,6 +1256,12 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
MultiCastDataSink multiCastDataSink = (MultiCastDataSink)
inputFragment.getSink();
DataStreamSink dataStreamSink =
multiCastDataSink.getDataStreamSinks().get(
multiCastDataSink.getDataStreamSinks().size() - 1);
+ if (CollectionUtils.isNotEmpty(dataStreamSink.getConjuncts())
+ ||
CollectionUtils.isNotEmpty(dataStreamSink.getProjections())) {
+ String errMsg = "generate invalid plan \n" +
filter.treeString();
+ LOG.warn(errMsg);
+ throw new AnalysisException(errMsg);
+ }
filter.getConjuncts().stream()
.map(e -> ExpressionTranslator.translate(e, context))
.forEach(dataStreamSink::addConjunct);
@@ -1265,24 +1269,28 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
}
PlanNode planNode = inputFragment.getPlanRoot();
- Plan child = filter.child();
- while (child instanceof PhysicalLimit) {
- child = ((PhysicalLimit<?>) child).child();
- }
- if (planNode instanceof ExchangeNode || planNode instanceof SortNode
|| planNode instanceof UnionNode
- // this means we have filter->limit->project, need a SelectNode
- || child instanceof PhysicalProject) {
- // the three nodes don't support conjuncts, need create a
SelectNode to filter data
+ // the three nodes don't support conjuncts, need create a SelectNode
to filter data
+ if (planNode instanceof ExchangeNode || planNode instanceof SortNode
|| planNode instanceof UnionNode) {
SelectNode selectNode = new SelectNode(context.nextPlanNodeId(),
planNode);
selectNode.setNereidsId(filter.getId());
addConjunctsToPlanNode(filter, selectNode, context);
addPlanRoot(inputFragment, selectNode, filter);
} else {
if (!(filter.child(0) instanceof AbstractPhysicalJoin)) {
+ // already have filter on this node, we should not override
it, so need a new node
+ if (!planNode.getConjuncts().isEmpty()
+ // already have project on this node, filter need
execute after project, so need a new node
+ ||
CollectionUtils.isNotEmpty(planNode.getProjectList())
+ // already have limit on this node, filter need
execute after limit, so need a new node
+ || planNode.hasLimit()) {
+ planNode = new SelectNode(context.nextPlanNodeId(),
planNode);
+ planNode.setNereidsId(filter.getId());
+ addPlanRoot(inputFragment, planNode, filter);
+ }
addConjunctsToPlanNode(filter, planNode, context);
- updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(),
filter);
}
}
+ updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), filter);
// in ut, filter.stats may be null
if (filter.getStats() != null) {
inputFragment.getPlanRoot().setCardinalityAfterFilter((long)
filter.getStats().getRowCount());
@@ -1866,8 +1874,15 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
}
PlanFragment inputFragment = project.child(0).accept(this, context);
-
PlanNode inputPlanNode = inputFragment.getPlanRoot();
+ // this means already have project on this node, filter need execute
after project, so need a new node
+ if (CollectionUtils.isNotEmpty(inputPlanNode.getProjectList())) {
+ SelectNode selectNode = new SelectNode(context.nextPlanNodeId(),
inputPlanNode);
+ selectNode.setNereidsId(project.getId());
+ addPlanRoot(inputFragment, selectNode, project);
+ inputPlanNode = selectNode;
+ }
+
List<Expr> projectionExprs = null;
List<Expr> allProjectionExprs = Lists.newArrayList();
List<Slot> slots = null;
@@ -1905,6 +1920,11 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
MultiCastDataSink multiCastDataSink = (MultiCastDataSink)
inputFragment.getSink();
DataStreamSink dataStreamSink =
multiCastDataSink.getDataStreamSinks().get(
multiCastDataSink.getDataStreamSinks().size() - 1);
+ if (CollectionUtils.isNotEmpty(dataStreamSink.getProjections())) {
+ String errMsg = "generate invalid plan \n" +
project.treeString();
+ LOG.warn(errMsg);
+ throw new AnalysisException(errMsg);
+ }
TupleDescriptor projectionTuple = generateTupleDesc(slots, null,
context);
dataStreamSink.setProjections(projectionExprs);
dataStreamSink.setOutputTupleDesc(projectionTuple);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]