This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ae83a838d06 [nereids] config global partition topn (#31476)
ae83a838d06 is described below

commit ae83a838d06c6e4a7db03594cb968bf8e118199b
Author: xzj7019 <131111794+xzj7...@users.noreply.github.com>
AuthorDate: Wed Feb 28 18:47:11 2024 +0800

    [nereids] config global partition topn (#31476)
    
    * [nereids] config global partition topn
    
    * [nereids] config global partition topn
    
    ---------
    
    Co-authored-by: zhongjian.xzj 
<zhongjian.xzj@zhongjianxzjdeMacBook-Pro.local>
---
 ...ogicalPartitionTopNToPhysicalPartitionTopN.java | 70 +++++++++++++++++++-
 .../java/org/apache/doris/qe/SessionVariable.java  | 13 ++++
 .../limit_push_down/limit_push_down.out            |  2 +-
 .../push_filter_through_ptopn.out                  |  2 +-
 .../noStatsRfPrune/query67.out                     |  2 +-
 .../no_stats_shape/query67.out                     |  2 +-
 .../explain/test_global_partition_topn_plan.groovy | 76 ++++++++++++++++++++++
 7 files changed, 162 insertions(+), 5 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java
index 2b6fcfe464d..410e68f3ec2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalPartitionTopNToPhysicalPartitionTopN.java
@@ -17,18 +17,26 @@
 
 package org.apache.doris.nereids.rules.implementation;
 
+import org.apache.doris.nereids.memo.Group;
 import org.apache.doris.nereids.properties.OrderKey;
 import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.OrderExpression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.Statistics;
 
 import com.google.common.collect.ImmutableList;
 
 import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * Implementation rule that convert logical partition-top-n to physical 
partition-top-n.
@@ -42,7 +50,8 @@ public class LogicalPartitionTopNToPhysicalPartitionTopN 
extends OneImplementati
 
     private List<PhysicalPartitionTopN<? extends Plan>> 
generatePhysicalPartitionTopn(
             LogicalPartitionTopN<? extends Plan> logicalPartitionTopN) {
-        if (logicalPartitionTopN.getPartitionKeys().isEmpty()) {
+        if (logicalPartitionTopN.getPartitionKeys().isEmpty()
+                || !checkTwoPhaseGlobalPartitionTopn(logicalPartitionTopN)) {
             // if no partition by keys, use local partition topn combined with 
further full sort
             List<OrderKey> orderKeys = 
!logicalPartitionTopN.getOrderKeys().isEmpty()
                     ? logicalPartitionTopN.getOrderKeys().stream()
@@ -99,6 +108,65 @@ public class LogicalPartitionTopNToPhysicalPartitionTopN 
extends OneImplementati
         }
     }
 
+    /**
+     * check if partition keys' ndv is almost near the total row count.
+     * if yes, it is not suitable for two phase global partition topn.
+     */
+    private boolean checkTwoPhaseGlobalPartitionTopn(LogicalPartitionTopN<? 
extends Plan> logicalPartitionTopN) {
+        double globalPartitionTopnThreshold = 
ConnectContext.get().getSessionVariable()
+                .getGlobalPartitionTopNThreshold();
+        if (logicalPartitionTopN.getGroupExpression().isPresent()) {
+            Group group = 
logicalPartitionTopN.getGroupExpression().get().getOwnerGroup();
+            if (group != null && group.getStatistics() != null) {
+                Statistics stats = group.getStatistics();
+                double rowCount = stats.getRowCount();
+                List<Expression> partitionKeys = 
logicalPartitionTopN.getPartitionKeys();
+                if (!checkPartitionKeys(partitionKeys)) {
+                    return false;
+                }
+                List<ColumnStatistic> partitionByKeyStats = 
partitionKeys.stream()
+                        .map(partitionKey -> 
stats.findColumnStatistics(partitionKey))
+                        .filter(Objects::nonNull)
+                        .filter(e -> !e.isUnKnown)
+                        .collect(Collectors.toList());
+                if (partitionByKeyStats.size() != partitionKeys.size()) {
+                    return false;
+                } else {
+                    List<Double> ndvs = partitionByKeyStats.stream().map(s -> 
s.ndv)
+                            .filter(e -> e > 0 && !Double.isInfinite(e))
+                            .collect(Collectors.toList());
+                    if (ndvs.size() != partitionByKeyStats.size()) {
+                        return false;
+                    } else {
+                        double maxNdv = 
ndvs.stream().max(Double::compare).get();
+                        return rowCount / maxNdv >= 
globalPartitionTopnThreshold;
+                    }
+                }
+            } else {
+                return false;
+            }
+        } else {
+            return false;
+        }
+    }
+
+    /**
+     * global partition topn only take effect if partition keys are columns 
from basic table
+     */
+    private boolean checkPartitionKeys(List<Expression> partitionKeys) {
+        for (Expression expr : partitionKeys) {
+            if (!(expr instanceof SlotReference)) {
+                return false;
+            } else {
+                SlotReference slot = (SlotReference) expr;
+                if (!slot.getColumn().isPresent() || 
!slot.getTable().isPresent()) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
     private ImmutableList<OrderKey> getAllOrderKeys(LogicalPartitionTopN<? 
extends Plan> logicalPartitionTopN) {
         ImmutableList.Builder<OrderKey> builder = ImmutableList.builder();
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 6c0fecf0d34..046d6d1a92e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -209,6 +209,8 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String MAX_JOIN_NUMBER_BUSHY_TREE = 
"max_join_number_bushy_tree";
     public static final String ENABLE_PARTITION_TOPN = "enable_partition_topn";
 
+    public static final String GLOBAL_PARTITION_TOPN_THRESHOLD = 
"global_partition_topn_threshold";
+
     public static final String ENABLE_INFER_PREDICATE = 
"enable_infer_predicate";
 
     public static final long DEFAULT_INSERT_VISIBLE_TIMEOUT_MS = 10_000;
@@ -1027,6 +1029,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_PARTITION_TOPN)
     private boolean enablePartitionTopN = true;
 
+    @VariableMgr.VarAttr(name = GLOBAL_PARTITION_TOPN_THRESHOLD)
+    private double globalPartitionTopNThreshold = 100;
+
     @VariableMgr.VarAttr(name = ENABLE_INFER_PREDICATE)
     private boolean enableInferPredicate = true;
 
@@ -2533,6 +2538,14 @@ public class SessionVariable implements Serializable, 
Writable {
         this.enablePartitionTopN = enablePartitionTopN;
     }
 
+    public double getGlobalPartitionTopNThreshold() {
+        return globalPartitionTopNThreshold;
+    }
+
+    public void setGlobalPartitionTopnThreshold(int threshold) {
+        this.globalPartitionTopNThreshold = threshold;
+    }
+
     public boolean isEnableFoldNondeterministicFn() {
         return enableFoldNondeterministicFn;
     }
diff --git 
a/regression-test/data/nereids_rules_p0/limit_push_down/limit_push_down.out 
b/regression-test/data/nereids_rules_p0/limit_push_down/limit_push_down.out
index 31ffeee07dd..cbf8a09c7e6 100644
--- a/regression-test/data/nereids_rules_p0/limit_push_down/limit_push_down.out
+++ b/regression-test/data/nereids_rules_p0/limit_push_down/limit_push_down.out
@@ -389,7 +389,7 @@ PhysicalResultSink
 --PhysicalLimit[GLOBAL]
 ----PhysicalLimit[LOCAL]
 ------PhysicalWindow
---------PhysicalPartitionTopN
+--------PhysicalQuickSort[LOCAL_SORT]
 ----------PhysicalPartitionTopN
 ------------hashAgg[GLOBAL]
 --------------hashAgg[LOCAL]
diff --git 
a/regression-test/data/nereids_syntax_p0/push_filter_through_ptopn.out 
b/regression-test/data/nereids_syntax_p0/push_filter_through_ptopn.out
index e37f73ef793..dd23fd8ea50 100644
--- a/regression-test/data/nereids_syntax_p0/push_filter_through_ptopn.out
+++ b/regression-test/data/nereids_syntax_p0/push_filter_through_ptopn.out
@@ -44,7 +44,7 @@ PhysicalResultSink
 ----PhysicalProject
 ------filter((T.b = 2) and (rn <= 2))
 --------PhysicalWindow
-----------PhysicalPartitionTopN
+----------PhysicalQuickSort[LOCAL_SORT]
 ------------PhysicalDistribute[DistributionSpecHash]
 --------------PhysicalPartitionTopN
 ----------------PhysicalProject
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query67.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query67.out
index bd3f6458a3c..246d2337283 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query67.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query67.out
@@ -6,7 +6,7 @@ PhysicalResultSink
 ------PhysicalTopN[LOCAL_SORT]
 --------filter((rk <= 100))
 ----------PhysicalWindow
-------------PhysicalPartitionTopN
+------------PhysicalQuickSort[LOCAL_SORT]
 --------------PhysicalDistribute[DistributionSpecHash]
 ----------------PhysicalPartitionTopN
 ------------------PhysicalProject
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query67.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query67.out
index d069db662b3..900fe97ff05 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query67.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query67.out
@@ -6,7 +6,7 @@ PhysicalResultSink
 ------PhysicalTopN[LOCAL_SORT]
 --------filter((rk <= 100))
 ----------PhysicalWindow
-------------PhysicalPartitionTopN
+------------PhysicalQuickSort[LOCAL_SORT]
 --------------PhysicalDistribute[DistributionSpecHash]
 ----------------PhysicalPartitionTopN
 ------------------PhysicalProject
diff --git 
a/regression-test/suites/nereids_p0/explain/test_global_partition_topn_plan.groovy
 
b/regression-test/suites/nereids_p0/explain/test_global_partition_topn_plan.groovy
new file mode 100644
index 00000000000..7141ac9fce2
--- /dev/null
+++ 
b/regression-test/suites/nereids_p0/explain/test_global_partition_topn_plan.groovy
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_global_partition_topn_plan") {
+    sql "SET enable_nereids_planner=true"
+    sql "SET enable_fallback_to_original_planner=false"
+
+    sql "DROP TABLE IF EXISTS test_global_partition_topn_plan"
+    sql """ CREATE TABLE `test_global_partition_topn_plan` (
+           c1 int, c2 int, c3 int
+    )ENGINE=OLAP
+    distributed by hash(c1) buckets 10
+    properties(
+        "replication_allocation" = "tag.location.default: 1"
+    );"""
+
+    sql """ alter table test_global_partition_topn_plan modify column c1 set 
stats('row_count'='52899687', 'ndv'='52899687', 'num_nulls'='0', 
'min_value'='1', 'max_value'='52899687', 'data_size'='4'); """
+    sql """ alter table test_global_partition_topn_plan modify column c2 set 
stats('row_count'='52899687', 'ndv'='23622730', 'num_nulls'='0', 
'min_value'='1', 'max_value'='52899687', 'data_size'='4'); """
+    sql """ alter table test_global_partition_topn_plan modify column c3 set 
stats('row_count'='52899687', 'ndv'='2', 'num_nulls'='0', 'min_value'='0', 
'max_value'='1', 'data_size'='4'); """
+
+    sql "SET global_partition_topn_threshold=2"
+    explain {
+        sql("shape plan select rn from (select row_number() over (partition by 
c2 order by c3) as rn from test_global_partition_topn_plan) tmp where rn <= 
100");
+        contains"PhysicalPartitionTopN"
+        notContains"PhysicalQuickSort"
+    }
+
+    sql "SET global_partition_topn_threshold=3"
+    explain {
+        sql("shape plan select rn from (select row_number() over (partition by 
c2 order by c3) as rn from test_global_partition_topn_plan) tmp where rn <= 
100");
+        contains"PhysicalPartitionTopN"
+        contains"PhysicalQuickSort"
+    }
+
+    sql "SET global_partition_topn_threshold=100"
+    explain {
+        sql("shape plan select rn from (select row_number() over (partition by 
c3 order by c2) as rn from test_global_partition_topn_plan) tmp where rn <= 
100");
+        contains"PhysicalPartitionTopN"
+        notContains"PhysicalQuickSort"
+    }
+
+    sql "SET global_partition_topn_threshold=2"
+    explain {
+        sql("shape plan select rn from (select row_number() over (partition by 
c2, c3 order by c1) as rn from test_global_partition_topn_plan) tmp where rn <= 
100");
+        contains"PhysicalPartitionTopN"
+        notContains"PhysicalQuickSort"
+    }
+
+    sql "SET global_partition_topn_threshold=3"
+    explain {
+        sql("shape plan select rn from (select row_number() over (partition by 
c2, c3 order by c1) as rn from test_global_partition_topn_plan) tmp where rn <= 
100");
+        contains"PhysicalPartitionTopN"
+        contains"PhysicalQuickSort"
+    }
+
+    sql "SET global_partition_topn_threshold=2"
+    explain {
+        sql("shape plan select rn from (select row_number() over (partition by 
c2 + c3 order by c1) as rn from test_global_partition_topn_plan) tmp where rn 
<= 100");
+        contains"PhysicalPartitionTopN"
+        contains"PhysicalQuickSort"
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to