This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 032b9c9c2d fix: impl ordering for serialization/deserialization for 
AggregateUdf (#11926)
032b9c9c2d is described below

commit 032b9c9c2d4dbbe7c65cc8122fb23258860ce148
Author: Huaijin <haohuai...@gmail.com>
AuthorDate: Mon Aug 12 18:33:58 2024 +0800

    fix: impl ordering for serialization/deserialization for AggregateUdf 
(#11926)
    
    * fix: support ordering and pencentile function ser/der
    
    * add more test case
---
 .../core/src/physical_optimizer/test_utils.rs      |  1 -
 datafusion/core/src/physical_planner.rs            |  1 -
 datafusion/core/tests/fuzz_cases/window_fuzz.rs    |  4 --
 .../src/windows/bounded_window_agg_exec.rs         |  6 +-
 datafusion/physical-plan/src/windows/mod.rs        |  6 +-
 datafusion/proto/src/physical_plan/from_proto.rs   |  3 -
 datafusion/proto/src/physical_plan/mod.rs          |  6 +-
 .../proto/tests/cases/roundtrip_physical_plan.rs   | 66 ++++++++++++++++++++++
 8 files changed, 71 insertions(+), 22 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs 
b/datafusion/core/src/physical_optimizer/test_utils.rs
index 55a0fa8145..90853c3476 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -251,7 +251,6 @@ pub fn bounded_window_exec(
                 "count".to_owned(),
                 &[col(col_name, &schema).unwrap()],
                 &[],
-                &[],
                 &sort_exprs,
                 Arc::new(WindowFrame::new(Some(false))),
                 schema.as_ref(),
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 7eb468f56e..9cc2f253f8 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1510,7 +1510,6 @@ pub fn create_window_expr_with_name(
                 fun,
                 name,
                 &physical_args,
-                args,
                 &partition_by,
                 &order_by,
                 window_frame,
diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index 813862c4cc..d75d8e4337 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -253,7 +253,6 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
 
     let partitionby_exprs = vec![];
     let orderby_exprs = vec![];
-    let logical_exprs = vec![];
     // Window frame starts with "UNBOUNDED PRECEDING":
     let start_bound = WindowFrameBound::Preceding(ScalarValue::UInt64(None));
 
@@ -285,7 +284,6 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
                     &window_fn,
                     fn_name.to_string(),
                     &args,
-                    &logical_exprs,
                     &partitionby_exprs,
                     &orderby_exprs,
                     Arc::new(window_frame),
@@ -674,7 +672,6 @@ async fn run_window_test(
             &window_fn,
             fn_name.clone(),
             &args,
-            &[],
             &partitionby_exprs,
             &orderby_exprs,
             Arc::new(window_frame.clone()),
@@ -693,7 +690,6 @@ async fn run_window_test(
             &window_fn,
             fn_name,
             &args,
-            &[],
             &partitionby_exprs,
             &orderby_exprs,
             Arc::new(window_frame.clone()),
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index 6311107f7b..29ead35895 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -1196,7 +1196,7 @@ mod tests {
         RecordBatchStream, SendableRecordBatchStream, TaskContext,
     };
     use datafusion_expr::{
-        Expr, WindowFrame, WindowFrameBound, WindowFrameUnits, 
WindowFunctionDefinition,
+        WindowFrame, WindowFrameBound, WindowFrameUnits, 
WindowFunctionDefinition,
     };
     use datafusion_functions_aggregate::count::count_udaf;
     use datafusion_physical_expr::expressions::{col, Column, NthValue};
@@ -1303,10 +1303,7 @@ mod tests {
         let window_fn = WindowFunctionDefinition::AggregateUDF(count_udaf());
         let col_expr =
             Arc::new(Column::new(schema.fields[0].name(), 0)) as Arc<dyn 
PhysicalExpr>;
-        let log_expr =
-            
Expr::Column(datafusion_common::Column::from(schema.fields[0].name()));
         let args = vec![col_expr];
-        let log_args = vec![log_expr];
         let partitionby_exprs = vec![col(hash, &schema)?];
         let orderby_exprs = vec![PhysicalSortExpr {
             expr: col(order_by, &schema)?,
@@ -1327,7 +1324,6 @@ mod tests {
                 &window_fn,
                 fn_name,
                 &args,
-                &log_args,
                 &partitionby_exprs,
                 &orderby_exprs,
                 Arc::new(window_frame.clone()),
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index 2e6ad4e1a1..1fd0ca36b1 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -32,8 +32,8 @@ use arrow::datatypes::Schema;
 use arrow_schema::{DataType, Field, SchemaRef};
 use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
 use datafusion_expr::{
-    BuiltInWindowFunction, Expr, PartitionEvaluator, WindowFrame,
-    WindowFunctionDefinition, WindowUDF,
+    BuiltInWindowFunction, PartitionEvaluator, WindowFrame, 
WindowFunctionDefinition,
+    WindowUDF,
 };
 use datafusion_physical_expr::equivalence::collapse_lex_req;
 use datafusion_physical_expr::{
@@ -94,7 +94,6 @@ pub fn create_window_expr(
     fun: &WindowFunctionDefinition,
     name: String,
     args: &[Arc<dyn PhysicalExpr>],
-    _logical_args: &[Expr],
     partition_by: &[Arc<dyn PhysicalExpr>],
     order_by: &[PhysicalSortExpr],
     window_frame: Arc<WindowFrame>,
@@ -746,7 +745,6 @@ mod tests {
                 &[col("a", &schema)?],
                 &[],
                 &[],
-                &[],
                 Arc::new(WindowFrame::new(None)),
                 schema.as_ref(),
                 false,
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index bc0a19336b..b2f92f4b2e 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -169,13 +169,10 @@ pub fn parse_physical_window_expr(
     // TODO: Remove extended_schema if functions are all UDAF
     let extended_schema =
         schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?;
-    // approx_percentile_cont and approx_percentile_cont_weight are not 
supported for UDAF from protobuf yet.
-    let logical_exprs = &[];
     create_window_expr(
         &fun,
         name,
         &window_node_expr,
-        logical_exprs,
         &partition_by,
         &order_by,
         Arc::new(window_frame),
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index b5d28f40a6..0f6722dd37 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -477,7 +477,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                             ExprType::AggregateExpr(agg_node) => {
                                 let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> 
= agg_node.expr.iter()
                                     .map(|e| parse_physical_expr(e, registry, 
&physical_schema, extension_codec)).collect::<Result<Vec<_>>>()?;
-                                let _ordering_req: Vec<PhysicalSortExpr> = 
agg_node.ordering_req.iter()
+                                let ordering_req: Vec<PhysicalSortExpr> = 
agg_node.ordering_req.iter()
                                     .map(|e| parse_physical_sort_expr(e, 
registry, &physical_schema, extension_codec)).collect::<Result<Vec<_>>>()?;
                                 
agg_node.aggregate_function.as_ref().map(|func| {
                                     match func {
@@ -487,14 +487,12 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                                                 None => 
registry.udaf(udaf_name)?
                                             };
 
-                                            // TODO: approx_percentile_cont 
and approx_percentile_cont_weight are not supported for UDAF from protobuf yet.
-                                            // TODO: `order by` is not 
supported for UDAF yet
-                                            // 
https://github.com/apache/datafusion/issues/11804
                                             AggregateExprBuilder::new(agg_udf, 
input_phy_expr)
                                                 
.schema(Arc::clone(&physical_schema))
                                                 .alias(name)
                                                 
.with_ignore_nulls(agg_node.ignore_nulls)
                                                 
.with_distinct(agg_node.distinct)
+                                                .order_by(ordering_req)
                                                 .build()
                                         }
                                     }
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 1a9c6d40eb..6766468ef4 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -25,6 +25,8 @@ use std::vec;
 use arrow::array::RecordBatch;
 use arrow::csv::WriterBuilder;
 use 
datafusion::physical_expr_functions_aggregate::aggregate::AggregateExprBuilder;
+use 
datafusion_functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf;
+use datafusion_functions_aggregate::array_agg::array_agg_udaf;
 use datafusion_functions_aggregate::min_max::max_udaf;
 use prost::Message;
 
@@ -412,6 +414,70 @@ fn rountrip_aggregate_with_limit() -> Result<()> {
     roundtrip_test(Arc::new(agg))
 }
 
+#[test]
+fn rountrip_aggregate_with_approx_pencentile_cont() -> Result<()> {
+    let field_a = Field::new("a", DataType::Int64, false);
+    let field_b = Field::new("b", DataType::Int64, false);
+    let schema = Arc::new(Schema::new(vec![field_a, field_b]));
+
+    let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
+        vec![(col("a", &schema)?, "unused".to_string())];
+
+    let aggregates: Vec<Arc<dyn AggregateExpr>> = 
vec![AggregateExprBuilder::new(
+        approx_percentile_cont_udaf(),
+        vec![col("b", &schema)?, lit(0.5)],
+    )
+    .schema(Arc::clone(&schema))
+    .alias("APPROX_PERCENTILE_CONT(b, 0.5)")
+    .build()?];
+
+    let agg = AggregateExec::try_new(
+        AggregateMode::Final,
+        PhysicalGroupBy::new_single(groups.clone()),
+        aggregates.clone(),
+        vec![None],
+        Arc::new(EmptyExec::new(schema.clone())),
+        schema,
+    )?;
+    roundtrip_test(Arc::new(agg))
+}
+
+#[test]
+fn rountrip_aggregate_with_sort() -> Result<()> {
+    let field_a = Field::new("a", DataType::Int64, false);
+    let field_b = Field::new("b", DataType::Int64, false);
+    let schema = Arc::new(Schema::new(vec![field_a, field_b]));
+
+    let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
+        vec![(col("a", &schema)?, "unused".to_string())];
+    let sort_exprs = vec![PhysicalSortExpr {
+        expr: col("b", &schema)?,
+        options: SortOptions {
+            descending: false,
+            nulls_first: true,
+        },
+    }];
+
+    let aggregates: Vec<Arc<dyn AggregateExpr>> =
+        vec![
+            AggregateExprBuilder::new(array_agg_udaf(), vec![col("b", 
&schema)?])
+                .schema(Arc::clone(&schema))
+                .alias("ARRAY_AGG(b)")
+                .order_by(sort_exprs)
+                .build()?,
+        ];
+
+    let agg = AggregateExec::try_new(
+        AggregateMode::Final,
+        PhysicalGroupBy::new_single(groups.clone()),
+        aggregates.clone(),
+        vec![None],
+        Arc::new(EmptyExec::new(schema.clone())),
+        schema,
+    )?;
+    roundtrip_test(Arc::new(agg))
+}
+
 #[test]
 fn roundtrip_aggregate_udaf() -> Result<()> {
     let field_a = Field::new("a", DataType::Int64, false);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to