This is an automated email from the ASF dual-hosted git repository.

starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fc42a5fa4f [fix](Nereids) handle continuous filter or project in plan 
(#40176)
7fc42a5fa4f is described below

commit 7fc42a5fa4fd9bcc2259fde6e0d1ba9c49efec6f
Author: morrySnow <[email protected]>
AuthorDate: Tue Sep 10 15:46:59 2024 +0800

    [fix](Nereids) handle continuous filter or project in plan (#40176)
    
    if we meet continuous project or filter in translator, we try to
    generate SelectNode as far as possible to avoid generate invalid plan
    
    for example
    
    ```
    Filter(conjuncts 1)
    +-- Limit (limit 10)
        +-- Filter(conjuncts 2)
            +-- Aggregate
    ```
    
    will be translated to
    
    ```
    SELECT_NODE (conjuncts 1)
    +-- AGGREGATE_NODE (conjuncts 2) (limit 10)
    ```
---
 .../glue/translator/PhysicalPlanTranslator.java    | 44 ++++++++++++++++------
 1 file changed, 32 insertions(+), 12 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 2a34bc3ca91..28456041f7d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -720,7 +720,6 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         JdbcScanNode jdbcScanNode = new JdbcScanNode(context.nextPlanNodeId(), 
tupleDescriptor,
                 table instanceof JdbcExternalTable);
         jdbcScanNode.setNereidsId(jdbcScan.getId());
-        
jdbcScanNode.addConjuncts(translateToLegacyConjuncts(jdbcScan.getConjuncts()));
         Utils.execWithUncheckedException(jdbcScanNode::init);
         context.addScanNode(jdbcScanNode, jdbcScan);
         context.getRuntimeTranslator().ifPresent(
@@ -744,7 +743,6 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         OdbcScanNode odbcScanNode = new OdbcScanNode(context.nextPlanNodeId(), 
tupleDescriptor,
                 (OdbcTable) table);
         odbcScanNode.setNereidsId(odbcScan.getId());
-        
odbcScanNode.addConjuncts(translateToLegacyConjuncts(odbcScan.getConjuncts()));
         Utils.execWithUncheckedException(odbcScanNode::init);
         context.addScanNode(odbcScanNode, odbcScan);
         context.getRuntimeTranslator().ifPresent(
@@ -1258,6 +1256,12 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             MultiCastDataSink multiCastDataSink = (MultiCastDataSink) 
inputFragment.getSink();
             DataStreamSink dataStreamSink = 
multiCastDataSink.getDataStreamSinks().get(
                     multiCastDataSink.getDataStreamSinks().size() - 1);
+            if (CollectionUtils.isNotEmpty(dataStreamSink.getConjuncts())
+                    || 
CollectionUtils.isNotEmpty(dataStreamSink.getProjections())) {
+                String errMsg = "generate invalid plan \n" + 
filter.treeString();
+                LOG.warn(errMsg);
+                throw new AnalysisException(errMsg);
+            }
             filter.getConjuncts().stream()
                     .map(e -> ExpressionTranslator.translate(e, context))
                     .forEach(dataStreamSink::addConjunct);
@@ -1265,24 +1269,28 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         }
 
         PlanNode planNode = inputFragment.getPlanRoot();
-        Plan child = filter.child();
-        while (child instanceof PhysicalLimit) {
-            child = ((PhysicalLimit<?>) child).child();
-        }
-        if (planNode instanceof ExchangeNode || planNode instanceof SortNode 
|| planNode instanceof UnionNode
-                // this means we have filter->limit->project, need a SelectNode
-                || child instanceof PhysicalProject) {
-            // the three nodes don't support conjuncts, need create a 
SelectNode to filter data
+        // the three nodes don't support conjuncts, need create a SelectNode 
to filter data
+        if (planNode instanceof ExchangeNode || planNode instanceof SortNode 
|| planNode instanceof UnionNode) {
             SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), 
planNode);
             selectNode.setNereidsId(filter.getId());
             addConjunctsToPlanNode(filter, selectNode, context);
             addPlanRoot(inputFragment, selectNode, filter);
         } else {
             if (!(filter.child(0) instanceof AbstractPhysicalJoin)) {
+                // already have filter on this node, we should not override 
it, so need a new node
+                if (!planNode.getConjuncts().isEmpty()
+                        // already have project on this node, filter need 
execute after project, so need a new node
+                        || 
CollectionUtils.isNotEmpty(planNode.getProjectList())
+                        // already have limit on this node, filter need 
execute after limit, so need a new node
+                        || planNode.hasLimit()) {
+                    planNode = new SelectNode(context.nextPlanNodeId(), 
planNode);
+                    planNode.setNereidsId(filter.getId());
+                    addPlanRoot(inputFragment, planNode, filter);
+                }
                 addConjunctsToPlanNode(filter, planNode, context);
-                updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), 
filter);
             }
         }
+        updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), filter);
         // in ut, filter.stats may be null
         if (filter.getStats() != null) {
             inputFragment.getPlanRoot().setCardinalityAfterFilter((long) 
filter.getStats().getRowCount());
@@ -1866,8 +1874,15 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         }
 
         PlanFragment inputFragment = project.child(0).accept(this, context);
-
         PlanNode inputPlanNode = inputFragment.getPlanRoot();
+        // this means already have project on this node, filter need execute 
after project, so need a new node
+        if (CollectionUtils.isNotEmpty(inputPlanNode.getProjectList())) {
+            SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), 
inputPlanNode);
+            selectNode.setNereidsId(project.getId());
+            addPlanRoot(inputFragment, selectNode, project);
+            inputPlanNode = selectNode;
+        }
+
         List<Expr> projectionExprs = null;
         List<Expr> allProjectionExprs = Lists.newArrayList();
         List<Slot> slots = null;
@@ -1905,6 +1920,11 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             MultiCastDataSink multiCastDataSink = (MultiCastDataSink) 
inputFragment.getSink();
             DataStreamSink dataStreamSink = 
multiCastDataSink.getDataStreamSinks().get(
                     multiCastDataSink.getDataStreamSinks().size() - 1);
+            if (CollectionUtils.isNotEmpty(dataStreamSink.getProjections())) {
+                String errMsg = "generate invalid plan \n" + 
project.treeString();
+                LOG.warn(errMsg);
+                throw new AnalysisException(errMsg);
+            }
             TupleDescriptor projectionTuple = generateTupleDesc(slots, null, 
context);
             dataStreamSink.setProjections(projectionExprs);
             dataStreamSink.setOutputTupleDesc(projectionTuple);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to