This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/dev by this push:
     new b3fd4591 fix: disable percentile for distributed tables (#1406)
b3fd4591 is described below

commit b3fd4591d7dfe982b40ad2a4985916411647bed2
Author: Jiacai Liu <[email protected]>
AuthorDate: Wed Dec 27 17:51:08 2023 +0800

    fix: disable percentile for distributed tables (#1406)
    
    ## Rationale
    See #1405
    
    ## Detailed Changes
    Disable percentile functions
    
    ## Test Plan
    CI
---
 df_engine_extensions/src/dist_sql_query/physical_plan.rs    | 13 +++++++++++++
 .../cases/env/cluster/ddl/partition_table.result            | 12 ++++++++++++
 integration_tests/cases/env/cluster/ddl/partition_table.sql |  8 ++++++++
 3 files changed, 33 insertions(+)

diff --git a/df_engine_extensions/src/dist_sql_query/physical_plan.rs 
b/df_engine_extensions/src/dist_sql_query/physical_plan.rs
index e64ae96c..9825227c 100644
--- a/df_engine_extensions/src/dist_sql_query/physical_plan.rs
+++ b/df_engine_extensions/src/dist_sql_query/physical_plan.rs
@@ -33,6 +33,7 @@ use datafusion::{
         coalesce_batches::CoalesceBatchesExec,
         coalesce_partitions::CoalescePartitionsExec,
         displayable,
+        expressions::{ApproxPercentileCont, ApproxPercentileContWithWeight},
         filter::FilterExec,
         metrics::{Count, MetricValue, MetricsSet},
         projection::ProjectionExec,
@@ -619,8 +620,20 @@ pub enum PushDownEvent {
 }
 
 impl PushDownEvent {
+    // Those aggregate functions can't be pushed down.
+    // https://github.com/apache/incubator-horaedb/issues/1405
+    fn blacklist_expr(expr: &dyn Any) -> bool {
+        expr.is::<ApproxPercentileCont>() || 
expr.is::<ApproxPercentileContWithWeight>()
+    }
+
     pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
         if let Some(aggr) = plan.as_any().downcast_ref::<AggregateExec>() {
+            for aggr_expr in aggr.aggr_expr() {
+                if Self::blacklist_expr(aggr_expr.as_any()) {
+                    return Self::Unable;
+                }
+            }
+
             if *aggr.mode() == AggregateMode::Partial {
                 Self::Terminated(plan)
             } else {
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result 
b/integration_tests/cases/env/cluster/ddl/partition_table.result
index 623be786..81502d1c 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.result
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.result
@@ -169,6 +169,18 @@ tsid,t,name,id,value,
 
UInt64(12677620772014847982),Timestamp(1651737067000),String("ceresdb5"),Int32(0),Double(105.0),
 
 
+SELECT
+    time_bucket (t, "PT1M") AS ts,
+    approx_percentile_cont (value, 0.9) AS value
+FROM
+    random_partition_table_t
+GROUP BY
+    time_bucket (t, "PT1M");
+
+ts,value,
+Timestamp(1651737060000),Double(109.4),
+
+
 DROP TABLE IF EXISTS `random_partition_table_t`;
 
 affected_rows: 0
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.sql 
b/integration_tests/cases/env/cluster/ddl/partition_table.sql
index 76fc3986..59a1dd2a 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.sql
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.sql
@@ -82,6 +82,14 @@ SELECT * from random_partition_table_t where name = 
"ceresdb0";
 
 SELECT * from random_partition_table_t where name = "ceresdb5";
 
+SELECT
+    time_bucket (t, "PT1M") AS ts,
+    approx_percentile_cont (value, 0.9) AS value
+FROM
+    random_partition_table_t
+GROUP BY
+    time_bucket (t, "PT1M");
+
 DROP TABLE IF EXISTS `random_partition_table_t`;
 
 SHOW CREATE TABLE random_partition_table_t;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to