This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit c03ed2f897bfe38540fe27e3c7c5019c07efd1f4 Author: starocean999 <[email protected]> AuthorDate: Wed Oct 11 16:32:09 2023 +0800 [fix](nereids)disable parallel scan in some case (#25089) --- .../glue/translator/PhysicalPlanTranslator.java | 31 ++++++++++++ .../org/apache/doris/planner/PlanFragment.java | 2 + .../data/performance_p0/redundant_conjuncts.out | 4 ++ .../nereids_p0/test_disable_parallel_scan.groovy | 56 ++++++++++++++++++++++ 4 files changed, 93 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 049619eb209..fe52493f64f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -63,6 +63,7 @@ import org.apache.doris.nereids.properties.DistributionSpecReplicated; import org.apache.doris.nereids.properties.DistributionSpecStorageAny; import org.apache.doris.nereids.properties.DistributionSpecStorageGather; import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup; import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.nereids.trees.UnaryNode; @@ -1515,6 +1516,12 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla PartitionSortNode partitionSortNode = translatePartitionSortNode( partitionTopN, inputFragment.getPlanRoot(), context); addPlanRoot(inputFragment, partitionSortNode, partitionTopN); + // in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution + // we need turn of parallel scan to ensure to get correct result. + // TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary + if (findOlapScanNodesByPassExchangeAndJoinNode(inputFragment.getPlanRoot())) { + inputFragment.setHasColocatePlanNode(true); + } return inputFragment; } @@ -1715,6 +1722,14 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla setPlanRoot(setOperationFragment, setOperationNode, setOperation); } + // in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution + // we need turn of parallel scan to ensure to get correct result. + // TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary + if (!setOperation.getPhysicalProperties().equals(PhysicalProperties.ANY) + && findOlapScanNodesByPassExchangeAndJoinNode(setOperationFragment.getPlanRoot())) { + setOperationFragment.setHasColocatePlanNode(true); + } + return setOperationFragment; } @@ -1972,6 +1987,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla bufferedTupleDesc ); inputPlanFragment.addPlanRoot(analyticEvalNode); + + // in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution + // we need turn of parallel scan to ensure to get correct result. + // TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary + if (findOlapScanNodesByPassExchangeAndJoinNode(inputPlanFragment.getPlanRoot())) { + inputPlanFragment.setHasColocatePlanNode(true); + } return inputPlanFragment; } @@ -2336,4 +2358,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla return getCTEConsumerChild((PhysicalPlan) root.child(0)); } } + + private boolean findOlapScanNodesByPassExchangeAndJoinNode(PlanNode root) { + if (root instanceof OlapScanNode) { + return true; + } else if (!(root instanceof JoinNodeBase || root instanceof ExchangeNode)) { + return root.getChildren().stream().anyMatch(child -> findOlapScanNodesByPassExchangeAndJoinNode(child)); + } + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index cf596b556f9..471eccf14bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -319,6 +319,8 @@ public class PlanFragment extends TreeNode<PlanFragment> { } str.append("\n"); str.append(" PARTITION: " + dataPartition.getExplainString(explainLevel) + "\n"); + str.append(" HAS_COLO_PLAN_NODE: " + hasColocatePlanNode + "\n"); + str.append("\n"); if (sink != null) { str.append(sink.getExplainString(" ", explainLevel) + "\n"); } diff --git a/regression-test/data/performance_p0/redundant_conjuncts.out b/regression-test/data/performance_p0/redundant_conjuncts.out index f82e0c94536..4c432cf6d2b 100644 --- a/regression-test/data/performance_p0/redundant_conjuncts.out +++ b/regression-test/data/performance_p0/redundant_conjuncts.out @@ -5,6 +5,8 @@ PLAN FRAGMENT 0 `v1` PARTITION: HASH_PARTITIONED: `default_cluster:regression_test_performance_p0`.`redundant_conjuncts`.`k1` + HAS_COLO_PLAN_NODE: false + VRESULT SINK 0:VOlapScanNode @@ -20,6 +22,8 @@ PLAN FRAGMENT 0 `v1` PARTITION: HASH_PARTITIONED: `default_cluster:regression_test_performance_p0`.`redundant_conjuncts`.`k1` + HAS_COLO_PLAN_NODE: false + VRESULT SINK 0:VOlapScanNode diff --git a/regression-test/suites/nereids_p0/test_disable_parallel_scan.groovy b/regression-test/suites/nereids_p0/test_disable_parallel_scan.groovy new file mode 100644 index 00000000000..24c1b256c47 --- /dev/null +++ b/regression-test/suites/nereids_p0/test_disable_parallel_scan.groovy @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one + // or more contributor license agreements. See the NOTICE file + // distributed with this work for additional information + // regarding copyright ownership. The ASF licenses this file + // to you under the Apache License, Version 2.0 (the + // "License"); you may not use this file except in compliance + // with the License. You may obtain a copy of the License at + // + // http://www.apache.org/licenses/LICENSE-2.0 + // + // Unless required by applicable law or agreed to in writing, + // software distributed under the License is distributed on an + // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + // KIND, either express or implied. See the License for the + // specific language governing permissions and limitations + // under the License. + + suite("test_disable_parallel_scan") { + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + sql """drop table if exists sequence_count_test3;""" + sql """CREATE TABLE sequence_count_test3( + `uid` int COMMENT 'user id', + `date` datetime COMMENT 'date time', + `number` int NULL COMMENT 'number' + ) + DUPLICATE KEY(uid) + DISTRIBUTED BY HASH(uid) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + );""" + explain { + sql(""" + SELECT + uid, + DATE, + e.NUMBER AS EVENT_GROUP, + CASE + + WHEN ( + ( + UNIX_TIMESTAMP( DATE ) - ( + lag ( UNIX_TIMESTAMP( DATE ), 1, 0 ) over ( PARTITION BY uid ORDER BY DATE ) + ) + ) > 600 + ) THEN + 1 ELSE 0 + END AS SESSION_FLAG + FROM + sequence_count_test3 e + """) + contains "HAS_COLO_PLAN_NODE: true" + } + + sql """drop table if exists sequence_count_test3;""" + } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
