This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c03ed2f897bfe38540fe27e3c7c5019c07efd1f4
Author: starocean999 <[email protected]>
AuthorDate: Wed Oct 11 16:32:09 2023 +0800

    [fix](nereids)disable parallel scan in some case (#25089)
---
 .../glue/translator/PhysicalPlanTranslator.java    | 31 ++++++++++++
 .../org/apache/doris/planner/PlanFragment.java     |  2 +
 .../data/performance_p0/redundant_conjuncts.out    |  4 ++
 .../nereids_p0/test_disable_parallel_scan.groovy   | 56 ++++++++++++++++++++++
 4 files changed, 93 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 049619eb209..fe52493f64f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -63,6 +63,7 @@ import 
org.apache.doris.nereids.properties.DistributionSpecReplicated;
 import org.apache.doris.nereids.properties.DistributionSpecStorageAny;
 import org.apache.doris.nereids.properties.DistributionSpecStorageGather;
 import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.properties.PhysicalProperties;
 import 
org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup;
 import org.apache.doris.nereids.stats.StatsErrorEstimator;
 import org.apache.doris.nereids.trees.UnaryNode;
@@ -1515,6 +1516,12 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         PartitionSortNode partitionSortNode = translatePartitionSortNode(
                 partitionTopN, inputFragment.getPlanRoot(), context);
         addPlanRoot(inputFragment, partitionSortNode, partitionTopN);
+        // in pipeline engine, we use parallel scan by default, but it broke 
the rule of data distribution
+        // we need turn of parallel scan to ensure to get correct result.
+        // TODO: nereids forbid all parallel scan under PhysicalSetOperation 
temporary
+        if 
(findOlapScanNodesByPassExchangeAndJoinNode(inputFragment.getPlanRoot())) {
+            inputFragment.setHasColocatePlanNode(true);
+        }
         return inputFragment;
     }
 
@@ -1715,6 +1722,14 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             setPlanRoot(setOperationFragment, setOperationNode, setOperation);
         }
 
+        // in pipeline engine, we use parallel scan by default, but it broke 
the rule of data distribution
+        // we need turn of parallel scan to ensure to get correct result.
+        // TODO: nereids forbid all parallel scan under PhysicalSetOperation 
temporary
+        if 
(!setOperation.getPhysicalProperties().equals(PhysicalProperties.ANY)
+                && 
findOlapScanNodesByPassExchangeAndJoinNode(setOperationFragment.getPlanRoot())) 
{
+            setOperationFragment.setHasColocatePlanNode(true);
+        }
+
         return setOperationFragment;
     }
 
@@ -1972,6 +1987,13 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 bufferedTupleDesc
         );
         inputPlanFragment.addPlanRoot(analyticEvalNode);
+
+        // in pipeline engine, we use parallel scan by default, but it broke 
the rule of data distribution
+        // we need turn of parallel scan to ensure to get correct result.
+        // TODO: nereids forbid all parallel scan under PhysicalSetOperation 
temporary
+        if 
(findOlapScanNodesByPassExchangeAndJoinNode(inputPlanFragment.getPlanRoot())) {
+            inputPlanFragment.setHasColocatePlanNode(true);
+        }
         return inputPlanFragment;
     }
 
@@ -2336,4 +2358,13 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             return getCTEConsumerChild((PhysicalPlan) root.child(0));
         }
     }
+
+    private boolean findOlapScanNodesByPassExchangeAndJoinNode(PlanNode root) {
+        if (root instanceof OlapScanNode) {
+            return true;
+        } else if (!(root instanceof JoinNodeBase || root instanceof 
ExchangeNode)) {
+            return root.getChildren().stream().anyMatch(child -> 
findOlapScanNodesByPassExchangeAndJoinNode(child));
+        }
+        return false;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index cf596b556f9..471eccf14bb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -319,6 +319,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         }
         str.append("\n");
         str.append("  PARTITION: " + 
dataPartition.getExplainString(explainLevel) + "\n");
+        str.append("  HAS_COLO_PLAN_NODE: " + hasColocatePlanNode + "\n");
+        str.append("\n");
         if (sink != null) {
             str.append(sink.getExplainString("  ", explainLevel) + "\n");
         }
diff --git a/regression-test/data/performance_p0/redundant_conjuncts.out 
b/regression-test/data/performance_p0/redundant_conjuncts.out
index f82e0c94536..4c432cf6d2b 100644
--- a/regression-test/data/performance_p0/redundant_conjuncts.out
+++ b/regression-test/data/performance_p0/redundant_conjuncts.out
@@ -5,6 +5,8 @@ PLAN FRAGMENT 0
     `v1`
   PARTITION: HASH_PARTITIONED: 
`default_cluster:regression_test_performance_p0`.`redundant_conjuncts`.`k1`
 
+  HAS_COLO_PLAN_NODE: false
+
   VRESULT SINK
 
   0:VOlapScanNode
@@ -20,6 +22,8 @@ PLAN FRAGMENT 0
     `v1`
   PARTITION: HASH_PARTITIONED: 
`default_cluster:regression_test_performance_p0`.`redundant_conjuncts`.`k1`
 
+  HAS_COLO_PLAN_NODE: false
+
   VRESULT SINK
 
   0:VOlapScanNode
diff --git 
a/regression-test/suites/nereids_p0/test_disable_parallel_scan.groovy 
b/regression-test/suites/nereids_p0/test_disable_parallel_scan.groovy
new file mode 100644
index 00000000000..24c1b256c47
--- /dev/null
+++ b/regression-test/suites/nereids_p0/test_disable_parallel_scan.groovy
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+ // or more contributor license agreements.  See the NOTICE file
+ // distributed with this work for additional information
+ // regarding copyright ownership.  The ASF licenses this file
+ // to you under the Apache License, Version 2.0 (the
+ // "License"); you may not use this file except in compliance
+ // with the License.  You may obtain a copy of the License at
+ //
+ //   http://www.apache.org/licenses/LICENSE-2.0
+ //
+ // Unless required by applicable law or agreed to in writing,
+ // software distributed under the License is distributed on an
+ // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ // KIND, either express or implied.  See the License for the
+ // specific language governing permissions and limitations
+ // under the License.
+
+ suite("test_disable_parallel_scan") {
+     sql "SET enable_nereids_planner=true"
+     sql "SET enable_fallback_to_original_planner=false"
+     sql """drop table if exists sequence_count_test3;"""
+     sql """CREATE TABLE sequence_count_test3(
+                    `uid` int COMMENT 'user id',
+                    `date` datetime COMMENT 'date time', 
+                    `number` int NULL COMMENT 'number' 
+                            )
+            DUPLICATE KEY(uid) 
+            DISTRIBUTED BY HASH(uid) BUCKETS 1 
+            PROPERTIES ( 
+                "replication_num" = "1"
+            );"""
+     explain {
+        sql("""
+            SELECT
+                uid,
+                DATE,
+                e.NUMBER AS EVENT_GROUP,
+            CASE
+                    
+                    WHEN (
+                        (
+                            UNIX_TIMESTAMP( DATE ) - (
+                                lag ( UNIX_TIMESTAMP( DATE ), 1, 0 ) over ( 
PARTITION BY uid ORDER BY DATE  ) 
+                            ) 
+                        ) > 600 
+                        ) THEN
+                        1 ELSE 0 
+                    END AS SESSION_FLAG 
+                FROM
+                    sequence_count_test3 e 
+        """)
+        contains "HAS_COLO_PLAN_NODE: true"
+     }
+
+     sql """drop table if exists sequence_count_test3;"""
+ }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to