This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 10af8a7366 Improve performance for physical plan creation with many 
columns (#12950)
10af8a7366 is described below

commit 10af8a73662f4f6aac09a34157b7cf5fee034502
Author: Albert Skalt <[email protected]>
AuthorDate: Fri Oct 18 23:41:53 2024 +0300

    Improve performance for physical plan creation with many columns (#12950)
    
    * Add a benchmark for physical plan creation with many aggregates
    
    * Wrap AggregateFunctionExpr with Arc
    
    Patch f5c47fa274d53c1d524a1fb788d9a063bf5240ef removed Arc wrappers for 
AggregateFunctionExpr.
    But, it can be inefficient. When physical optimizer decides to replace a 
node child to other,
    it clones the node (with `with_new_children`). Assume, that node is 
`AggregateExec` than contains
    hundreds aggregates and these aggregates are cloned each time.
    
    This patch returns a Arc wrapping to not clone AggregateFunctionExpr itself 
but clone a pointer.
    
    * Do not build mapping if parent does not require any
    
    This patch adds a small optimization that can soft the edges on
    some queries. If there are no parent requirements we do not need to
    build column mapping.
---
 datafusion/core/benches/sql_planner.rs             |  14 +++
 .../src/physical_optimizer/update_aggr_exprs.rs    |  10 +-
 datafusion/core/src/physical_planner.rs            |   5 +-
 datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs |   1 +
 .../combine_partial_final_agg.rs                   |   8 +-
 .../limited_distinct_aggregation.rs                |  16 +--
 datafusion/physical-expr/src/aggregate.rs          |   2 +-
 datafusion/physical-expr/src/utils/mod.rs          |   4 +
 datafusion/physical-expr/src/window/aggregate.rs   |   8 +-
 .../physical-expr/src/window/sliding_aggregate.rs  |  13 ++-
 .../physical-optimizer/src/aggregate_statistics.rs |  24 ++---
 .../src/combine_partial_final_agg.rs               |   2 +-
 datafusion/physical-plan/src/aggregates/mod.rs     | 119 +++++++++++----------
 .../physical-plan/src/aggregates/row_hash.rs       |   4 +-
 datafusion/physical-plan/src/windows/mod.rs        |   5 +-
 datafusion/proto/src/physical_plan/mod.rs          |   3 +-
 datafusion/proto/src/physical_plan/to_proto.rs     |   2 +-
 .../proto/tests/cases/roundtrip_physical_plan.rs   |  44 ++++----
 18 files changed, 165 insertions(+), 119 deletions(-)

diff --git a/datafusion/core/benches/sql_planner.rs 
b/datafusion/core/benches/sql_planner.rs
index 00f6d59167..e7c35c8d86 100644
--- a/datafusion/core/benches/sql_planner.rs
+++ b/datafusion/core/benches/sql_planner.rs
@@ -144,6 +144,20 @@ fn criterion_benchmark(c: &mut Criterion) {
         })
     });
 
+    c.bench_function("physical_select_aggregates_from_200", |b| {
+        let mut aggregates = String::new();
+        for i in 0..200 {
+            if i > 0 {
+                aggregates.push_str(", ");
+            }
+            aggregates.push_str(format!("MAX(a{})", i).as_str());
+        }
+        let query = format!("SELECT {} FROM t1", aggregates);
+        b.iter(|| {
+            physical_plan(&ctx, &query);
+        });
+    });
+
     // --- TPC-H ---
 
     let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas());
diff --git a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs 
b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
index c0d9140c02..26cdd65883 100644
--- a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
+++ b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
@@ -131,10 +131,10 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder {
 /// successfully. Any errors occurring during the conversion process are
 /// passed through.
 fn try_convert_aggregate_if_better(
-    aggr_exprs: Vec<AggregateFunctionExpr>,
+    aggr_exprs: Vec<Arc<AggregateFunctionExpr>>,
     prefix_requirement: &[PhysicalSortRequirement],
     eq_properties: &EquivalenceProperties,
-) -> Result<Vec<AggregateFunctionExpr>> {
+) -> Result<Vec<Arc<AggregateFunctionExpr>>> {
     aggr_exprs
         .into_iter()
         .map(|aggr_expr| {
@@ -154,7 +154,7 @@ fn try_convert_aggregate_if_better(
                 let reqs = concat_slices(prefix_requirement, &aggr_sort_reqs);
                 if eq_properties.ordering_satisfy_requirement(&reqs) {
                     // Existing ordering satisfies the aggregator requirements:
-                    aggr_expr.with_beneficial_ordering(true)?
+                    aggr_expr.with_beneficial_ordering(true)?.map(Arc::new)
                 } else if 
eq_properties.ordering_satisfy_requirement(&concat_slices(
                     prefix_requirement,
                     &reverse_aggr_req,
@@ -163,12 +163,14 @@ fn try_convert_aggregate_if_better(
                     // given the existing ordering (if possible):
                     aggr_expr
                         .reverse_expr()
+                        .map(Arc::new)
                         .unwrap_or(aggr_expr)
                         .with_beneficial_ordering(true)?
+                        .map(Arc::new)
                 } else {
                     // There is no beneficial ordering present -- aggregation
                     // will still work albeit in a less efficient mode.
-                    aggr_expr.with_beneficial_ordering(false)?
+                    aggr_expr.with_beneficial_ordering(false)?.map(Arc::new)
                 }
                 .ok_or_else(|| {
                     plan_datafusion_err!(
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index cf2a157b04..a4dffd3d02 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1523,7 +1523,7 @@ pub fn create_window_expr(
 }
 
 type AggregateExprWithOptionalArgs = (
-    AggregateFunctionExpr,
+    Arc<AggregateFunctionExpr>,
     // The filter clause, if any
     Option<Arc<dyn PhysicalExpr>>,
     // Ordering requirements, if any
@@ -1587,7 +1587,8 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
                         .alias(name)
                         .with_ignore_nulls(ignore_nulls)
                         .with_distinct(*distinct)
-                        .build()?;
+                        .build()
+                        .map(Arc::new)?;
 
                 (agg_expr, filter, physical_sort_exprs)
             };
diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
index 34061a64d7..ff51282933 100644
--- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
@@ -405,6 +405,7 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, 
group_by_columns: Vec<&str
                 .schema(Arc::clone(&schema))
                 .alias("sum1")
                 .build()
+                .map(Arc::new)
                 .unwrap(),
         ];
     let expr = group_by_columns
diff --git 
a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs 
b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs
index 24e46b3ad9..85076abdaf 100644
--- a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs
@@ -84,7 +84,7 @@ fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
 fn partial_aggregate_exec(
     input: Arc<dyn ExecutionPlan>,
     group_by: PhysicalGroupBy,
-    aggr_expr: Vec<AggregateFunctionExpr>,
+    aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
 ) -> Arc<dyn ExecutionPlan> {
     let schema = input.schema();
     let n_aggr = aggr_expr.len();
@@ -104,7 +104,7 @@ fn partial_aggregate_exec(
 fn final_aggregate_exec(
     input: Arc<dyn ExecutionPlan>,
     group_by: PhysicalGroupBy,
-    aggr_expr: Vec<AggregateFunctionExpr>,
+    aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
 ) -> Arc<dyn ExecutionPlan> {
     let schema = input.schema();
     let n_aggr = aggr_expr.len();
@@ -130,11 +130,12 @@ fn count_expr(
     expr: Arc<dyn PhysicalExpr>,
     name: &str,
     schema: &Schema,
-) -> AggregateFunctionExpr {
+) -> Arc<AggregateFunctionExpr> {
     AggregateExprBuilder::new(count_udaf(), vec![expr])
         .schema(Arc::new(schema.clone()))
         .alias(name)
         .build()
+        .map(Arc::new)
         .unwrap()
 }
 
@@ -218,6 +219,7 @@ fn aggregations_with_group_combined() -> 
datafusion_common::Result<()> {
             .schema(Arc::clone(&schema))
             .alias("Sum(b)")
             .build()
+            .map(Arc::new)
             .unwrap(),
     ];
     let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
diff --git 
a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs 
b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs
index 042f6d6225..d6991711f5 100644
--- a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs
+++ b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs
@@ -347,10 +347,10 @@ fn test_has_aggregate_expression() -> Result<()> {
     let single_agg = AggregateExec::try_new(
         AggregateMode::Single,
         build_group_by(&schema, vec!["a".to_string()]),
-        vec![agg.count_expr(&schema)], /* aggr_expr */
-        vec![None],                    /* filter_expr */
-        source,                        /* input */
-        schema.clone(),                /* input_schema */
+        vec![Arc::new(agg.count_expr(&schema))], /* aggr_expr */
+        vec![None],                              /* filter_expr */
+        source,                                  /* input */
+        schema.clone(),                          /* input_schema */
     )?;
     let limit_exec = LocalLimitExec::new(
         Arc::new(single_agg),
@@ -384,10 +384,10 @@ fn test_has_filter() -> Result<()> {
     let single_agg = AggregateExec::try_new(
         AggregateMode::Single,
         build_group_by(&schema.clone(), vec!["a".to_string()]),
-        vec![agg.count_expr(&schema)], /* aggr_expr */
-        vec![filter_expr],             /* filter_expr */
-        source,                        /* input */
-        schema.clone(),                /* input_schema */
+        vec![Arc::new(agg.count_expr(&schema))], /* aggr_expr */
+        vec![filter_expr],                       /* filter_expr */
+        source,                                  /* input */
+        schema.clone(),                          /* input_schema */
     )?;
     let limit_exec = LocalLimitExec::new(
         Arc::new(single_agg),
diff --git a/datafusion/physical-expr/src/aggregate.rs 
b/datafusion/physical-expr/src/aggregate.rs
index 866596d0b6..6330c24024 100644
--- a/datafusion/physical-expr/src/aggregate.rs
+++ b/datafusion/physical-expr/src/aggregate.rs
@@ -328,7 +328,7 @@ impl AggregateFunctionExpr {
     /// not implement the method, returns an error. Order insensitive and hard
     /// requirement aggregators return `Ok(None)`.
     pub fn with_beneficial_ordering(
-        self,
+        self: Arc<Self>,
         beneficial_ordering: bool,
     ) -> Result<Option<AggregateFunctionExpr>> {
         let Some(updated_fn) = self
diff --git a/datafusion/physical-expr/src/utils/mod.rs 
b/datafusion/physical-expr/src/utils/mod.rs
index 4c37db4849..4bd022975a 100644
--- a/datafusion/physical-expr/src/utils/mod.rs
+++ b/datafusion/physical-expr/src/utils/mod.rs
@@ -86,6 +86,10 @@ pub fn map_columns_before_projection(
     parent_required: &[Arc<dyn PhysicalExpr>],
     proj_exprs: &[(Arc<dyn PhysicalExpr>, String)],
 ) -> Vec<Arc<dyn PhysicalExpr>> {
+    if parent_required.is_empty() {
+        // No need to build mapping.
+        return vec![];
+    }
     let column_mapping = proj_exprs
         .iter()
         .filter_map(|(expr, name)| {
diff --git a/datafusion/physical-expr/src/window/aggregate.rs 
b/datafusion/physical-expr/src/window/aggregate.rs
index d012fef93b..3fe5d842df 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -41,7 +41,7 @@ use crate::{expressions::PhysicalSortExpr, reverse_order_bys, 
PhysicalExpr};
 /// See comments on [`WindowExpr`] for more details.
 #[derive(Debug)]
 pub struct PlainAggregateWindowExpr {
-    aggregate: AggregateFunctionExpr,
+    aggregate: Arc<AggregateFunctionExpr>,
     partition_by: Vec<Arc<dyn PhysicalExpr>>,
     order_by: Vec<PhysicalSortExpr>,
     window_frame: Arc<WindowFrame>,
@@ -50,7 +50,7 @@ pub struct PlainAggregateWindowExpr {
 impl PlainAggregateWindowExpr {
     /// Create a new aggregate window function expression
     pub fn new(
-        aggregate: AggregateFunctionExpr,
+        aggregate: Arc<AggregateFunctionExpr>,
         partition_by: &[Arc<dyn PhysicalExpr>],
         order_by: &[PhysicalSortExpr],
         window_frame: Arc<WindowFrame>,
@@ -137,14 +137,14 @@ impl WindowExpr for PlainAggregateWindowExpr {
             let reverse_window_frame = self.window_frame.reverse();
             if reverse_window_frame.start_bound.is_unbounded() {
                 Arc::new(PlainAggregateWindowExpr::new(
-                    reverse_expr,
+                    Arc::new(reverse_expr),
                     &self.partition_by.clone(),
                     &reverse_order_bys(&self.order_by),
                     Arc::new(self.window_frame.reverse()),
                 )) as _
             } else {
                 Arc::new(SlidingAggregateWindowExpr::new(
-                    reverse_expr,
+                    Arc::new(reverse_expr),
                     &self.partition_by.clone(),
                     &reverse_order_bys(&self.order_by),
                     Arc::new(self.window_frame.reverse()),
diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs 
b/datafusion/physical-expr/src/window/sliding_aggregate.rs
index 143d59eb44..b889ec8c5d 100644
--- a/datafusion/physical-expr/src/window/sliding_aggregate.rs
+++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs
@@ -41,7 +41,7 @@ use crate::{expressions::PhysicalSortExpr, reverse_order_bys, 
PhysicalExpr};
 /// See comments on [`WindowExpr`] for more details.
 #[derive(Debug)]
 pub struct SlidingAggregateWindowExpr {
-    aggregate: AggregateFunctionExpr,
+    aggregate: Arc<AggregateFunctionExpr>,
     partition_by: Vec<Arc<dyn PhysicalExpr>>,
     order_by: Vec<PhysicalSortExpr>,
     window_frame: Arc<WindowFrame>,
@@ -50,7 +50,7 @@ pub struct SlidingAggregateWindowExpr {
 impl SlidingAggregateWindowExpr {
     /// Create a new (sliding) aggregate window function expression.
     pub fn new(
-        aggregate: AggregateFunctionExpr,
+        aggregate: Arc<AggregateFunctionExpr>,
         partition_by: &[Arc<dyn PhysicalExpr>],
         order_by: &[PhysicalSortExpr],
         window_frame: Arc<WindowFrame>,
@@ -121,14 +121,14 @@ impl WindowExpr for SlidingAggregateWindowExpr {
             let reverse_window_frame = self.window_frame.reverse();
             if reverse_window_frame.start_bound.is_unbounded() {
                 Arc::new(PlainAggregateWindowExpr::new(
-                    reverse_expr,
+                    Arc::new(reverse_expr),
                     &self.partition_by.clone(),
                     &reverse_order_bys(&self.order_by),
                     Arc::new(self.window_frame.reverse()),
                 )) as _
             } else {
                 Arc::new(SlidingAggregateWindowExpr::new(
-                    reverse_expr,
+                    Arc::new(reverse_expr),
                     &self.partition_by.clone(),
                     &reverse_order_bys(&self.order_by),
                     Arc::new(self.window_frame.reverse()),
@@ -159,7 +159,10 @@ impl WindowExpr for SlidingAggregateWindowExpr {
             })
             .collect::<Vec<_>>();
         Some(Arc::new(SlidingAggregateWindowExpr {
-            aggregate: self.aggregate.with_new_expressions(args, vec![])?,
+            aggregate: self
+                .aggregate
+                .with_new_expressions(args, vec![])
+                .map(Arc::new)?,
             partition_by: partition_bys,
             order_by: new_order_by,
             window_frame: Arc::clone(&self.window_frame),
diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs 
b/datafusion/physical-optimizer/src/aggregate_statistics.rs
index fd21362fd3..27870c7865 100644
--- a/datafusion/physical-optimizer/src/aggregate_statistics.rs
+++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs
@@ -312,7 +312,7 @@ mod tests {
         let partial_agg = AggregateExec::try_new(
             AggregateMode::Partial,
             PhysicalGroupBy::default(),
-            vec![agg.count_expr(&schema)],
+            vec![Arc::new(agg.count_expr(&schema))],
             vec![None],
             source,
             Arc::clone(&schema),
@@ -321,7 +321,7 @@ mod tests {
         let final_agg = AggregateExec::try_new(
             AggregateMode::Final,
             PhysicalGroupBy::default(),
-            vec![agg.count_expr(&schema)],
+            vec![Arc::new(agg.count_expr(&schema))],
             vec![None],
             Arc::new(partial_agg),
             Arc::clone(&schema),
@@ -342,7 +342,7 @@ mod tests {
         let partial_agg = AggregateExec::try_new(
             AggregateMode::Partial,
             PhysicalGroupBy::default(),
-            vec![agg.count_expr(&schema)],
+            vec![Arc::new(agg.count_expr(&schema))],
             vec![None],
             source,
             Arc::clone(&schema),
@@ -351,7 +351,7 @@ mod tests {
         let final_agg = AggregateExec::try_new(
             AggregateMode::Final,
             PhysicalGroupBy::default(),
-            vec![agg.count_expr(&schema)],
+            vec![Arc::new(agg.count_expr(&schema))],
             vec![None],
             Arc::new(partial_agg),
             Arc::clone(&schema),
@@ -371,7 +371,7 @@ mod tests {
         let partial_agg = AggregateExec::try_new(
             AggregateMode::Partial,
             PhysicalGroupBy::default(),
-            vec![agg.count_expr(&schema)],
+            vec![Arc::new(agg.count_expr(&schema))],
             vec![None],
             source,
             Arc::clone(&schema),
@@ -383,7 +383,7 @@ mod tests {
         let final_agg = AggregateExec::try_new(
             AggregateMode::Final,
             PhysicalGroupBy::default(),
-            vec![agg.count_expr(&schema)],
+            vec![Arc::new(agg.count_expr(&schema))],
             vec![None],
             Arc::new(coalesce),
             Arc::clone(&schema),
@@ -403,7 +403,7 @@ mod tests {
         let partial_agg = AggregateExec::try_new(
             AggregateMode::Partial,
             PhysicalGroupBy::default(),
-            vec![agg.count_expr(&schema)],
+            vec![Arc::new(agg.count_expr(&schema))],
             vec![None],
             source,
             Arc::clone(&schema),
@@ -415,7 +415,7 @@ mod tests {
         let final_agg = AggregateExec::try_new(
             AggregateMode::Final,
             PhysicalGroupBy::default(),
-            vec![agg.count_expr(&schema)],
+            vec![Arc::new(agg.count_expr(&schema))],
             vec![None],
             Arc::new(coalesce),
             Arc::clone(&schema),
@@ -446,7 +446,7 @@ mod tests {
         let partial_agg = AggregateExec::try_new(
             AggregateMode::Partial,
             PhysicalGroupBy::default(),
-            vec![agg.count_expr(&schema)],
+            vec![Arc::new(agg.count_expr(&schema))],
             vec![None],
             filter,
             Arc::clone(&schema),
@@ -455,7 +455,7 @@ mod tests {
         let final_agg = AggregateExec::try_new(
             AggregateMode::Final,
             PhysicalGroupBy::default(),
-            vec![agg.count_expr(&schema)],
+            vec![Arc::new(agg.count_expr(&schema))],
             vec![None],
             Arc::new(partial_agg),
             Arc::clone(&schema),
@@ -491,7 +491,7 @@ mod tests {
         let partial_agg = AggregateExec::try_new(
             AggregateMode::Partial,
             PhysicalGroupBy::default(),
-            vec![agg.count_expr(&schema)],
+            vec![Arc::new(agg.count_expr(&schema))],
             vec![None],
             filter,
             Arc::clone(&schema),
@@ -500,7 +500,7 @@ mod tests {
         let final_agg = AggregateExec::try_new(
             AggregateMode::Final,
             PhysicalGroupBy::default(),
-            vec![agg.count_expr(&schema)],
+            vec![Arc::new(agg.count_expr(&schema))],
             vec![None],
             Arc::new(partial_agg),
             Arc::clone(&schema),
diff --git a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs 
b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
index 4e352e25b5..86f7e73e9e 100644
--- a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
+++ b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
@@ -125,7 +125,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate 
{
 
 type GroupExprsRef<'a> = (
     &'a PhysicalGroupBy,
-    &'a [AggregateFunctionExpr],
+    &'a [Arc<AggregateFunctionExpr>],
     &'a [Option<Arc<dyn PhysicalExpr>>],
 );
 
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index 296c5811e5..f36bd920e8 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -351,7 +351,7 @@ pub struct AggregateExec {
     /// Group by expressions
     group_by: PhysicalGroupBy,
     /// Aggregate expressions
-    aggr_expr: Vec<AggregateFunctionExpr>,
+    aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
     /// FILTER (WHERE clause) expression for each aggregate expression
     filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
     /// Set if the output of this aggregation is truncated by a upstream 
sort/limit clause
@@ -378,7 +378,10 @@ impl AggregateExec {
     /// Function used in `OptimizeAggregateOrder` optimizer rule,
     /// where we need parts of the new value, others cloned from the old one
     /// Rewrites aggregate exec with new aggregate expressions.
-    pub fn with_new_aggr_exprs(&self, aggr_expr: Vec<AggregateFunctionExpr>) 
-> Self {
+    pub fn with_new_aggr_exprs(
+        &self,
+        aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
+    ) -> Self {
         Self {
             aggr_expr,
             // clone the rest of the fields
@@ -404,7 +407,7 @@ impl AggregateExec {
     pub fn try_new(
         mode: AggregateMode,
         group_by: PhysicalGroupBy,
-        aggr_expr: Vec<AggregateFunctionExpr>,
+        aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
         filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
         input: Arc<dyn ExecutionPlan>,
         input_schema: SchemaRef,
@@ -435,7 +438,7 @@ impl AggregateExec {
     fn try_new_with_schema(
         mode: AggregateMode,
         group_by: PhysicalGroupBy,
-        mut aggr_expr: Vec<AggregateFunctionExpr>,
+        mut aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
         filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
         input: Arc<dyn ExecutionPlan>,
         input_schema: SchemaRef,
@@ -545,7 +548,7 @@ impl AggregateExec {
     }
 
     /// Aggregate expressions
-    pub fn aggr_expr(&self) -> &[AggregateFunctionExpr] {
+    pub fn aggr_expr(&self) -> &[Arc<AggregateFunctionExpr>] {
         &self.aggr_expr
     }
 
@@ -876,7 +879,7 @@ impl ExecutionPlan for AggregateExec {
 fn create_schema(
     input_schema: &Schema,
     group_by: &PhysicalGroupBy,
-    aggr_expr: &[AggregateFunctionExpr],
+    aggr_expr: &[Arc<AggregateFunctionExpr>],
     mode: AggregateMode,
 ) -> Result<Schema> {
     let mut fields = Vec::with_capacity(group_by.num_output_exprs() + 
aggr_expr.len());
@@ -1006,7 +1009,7 @@ pub fn concat_slices<T: Clone>(lhs: &[T], rhs: &[T]) -> 
Vec<T> {
 /// A `LexRequirement` instance, which is the requirement that satisfies all 
the
 /// aggregate requirements. Returns an error in case of conflicting 
requirements.
 pub fn get_finer_aggregate_exprs_requirement(
-    aggr_exprs: &mut [AggregateFunctionExpr],
+    aggr_exprs: &mut [Arc<AggregateFunctionExpr>],
     group_by: &PhysicalGroupBy,
     eq_properties: &EquivalenceProperties,
     agg_mode: &AggregateMode,
@@ -1034,7 +1037,7 @@ pub fn get_finer_aggregate_exprs_requirement(
                     // Reverse requirement is satisfied by exiting ordering.
                     // Hence reverse the aggregator
                     requirement = finer_ordering;
-                    *aggr_expr = reverse_aggr_expr;
+                    *aggr_expr = Arc::new(reverse_aggr_expr);
                     continue;
                 }
             }
@@ -1058,7 +1061,7 @@ pub fn get_finer_aggregate_exprs_requirement(
                 // There is a requirement that both satisfies existing 
requirement and reverse
                 // aggregate requirement. Use updated requirement
                 requirement = finer_ordering;
-                *aggr_expr = reverse_aggr_expr;
+                *aggr_expr = Arc::new(reverse_aggr_expr);
                 continue;
             }
         }
@@ -1080,7 +1083,7 @@ pub fn get_finer_aggregate_exprs_requirement(
 /// * Partial: AggregateFunctionExpr::expressions
 /// * Final: columns of `AggregateFunctionExpr::state_fields()`
 pub fn aggregate_expressions(
-    aggr_expr: &[AggregateFunctionExpr],
+    aggr_expr: &[Arc<AggregateFunctionExpr>],
     mode: &AggregateMode,
     col_idx_base: usize,
 ) -> Result<Vec<Vec<Arc<dyn PhysicalExpr>>>> {
@@ -1135,7 +1138,7 @@ fn merge_expressions(
 pub type AccumulatorItem = Box<dyn Accumulator>;
 
 pub fn create_accumulators(
-    aggr_expr: &[AggregateFunctionExpr],
+    aggr_expr: &[Arc<AggregateFunctionExpr>],
 ) -> Result<Vec<AccumulatorItem>> {
     aggr_expr
         .iter()
@@ -1458,10 +1461,12 @@ mod tests {
             ],
         );
 
-        let aggregates = vec![AggregateExprBuilder::new(count_udaf(), 
vec![lit(1i8)])
-            .schema(Arc::clone(&input_schema))
-            .alias("COUNT(1)")
-            .build()?];
+        let aggregates = vec![Arc::new(
+            AggregateExprBuilder::new(count_udaf(), vec![lit(1i8)])
+                .schema(Arc::clone(&input_schema))
+                .alias("COUNT(1)")
+                .build()?,
+        )];
 
         let task_ctx = if spill {
             // adjust the max memory size to have the partial aggregate result 
for spill mode.
@@ -1596,13 +1601,12 @@ mod tests {
             vec![vec![false]],
         );
 
-        let aggregates: Vec<AggregateFunctionExpr> =
-            vec![
-                AggregateExprBuilder::new(avg_udaf(), vec![col("b", 
&input_schema)?])
-                    .schema(Arc::clone(&input_schema))
-                    .alias("AVG(b)")
-                    .build()?,
-            ];
+        let aggregates: Vec<Arc<AggregateFunctionExpr>> = vec![Arc::new(
+            AggregateExprBuilder::new(avg_udaf(), vec![col("b", 
&input_schema)?])
+                .schema(Arc::clone(&input_schema))
+                .alias("AVG(b)")
+                .build()?,
+        )];
 
         let task_ctx = if spill {
             // set to an appropriate value to trigger spill
@@ -1925,17 +1929,16 @@ mod tests {
         );
 
         // something that allocates within the aggregator
-        let aggregates_v0: Vec<AggregateFunctionExpr> =
-            vec![test_median_agg_expr(Arc::clone(&input_schema))?];
+        let aggregates_v0: Vec<Arc<AggregateFunctionExpr>> =
+            vec![Arc::new(test_median_agg_expr(Arc::clone(&input_schema))?)];
 
         // use fast-path in `row_hash.rs`.
-        let aggregates_v2: Vec<AggregateFunctionExpr> =
-            vec![
-                AggregateExprBuilder::new(avg_udaf(), vec![col("b", 
&input_schema)?])
-                    .schema(Arc::clone(&input_schema))
-                    .alias("AVG(b)")
-                    .build()?,
-            ];
+        let aggregates_v2: Vec<Arc<AggregateFunctionExpr>> = vec![Arc::new(
+            AggregateExprBuilder::new(avg_udaf(), vec![col("b", 
&input_schema)?])
+                .schema(Arc::clone(&input_schema))
+                .alias("AVG(b)")
+                .build()?,
+        )];
 
         for (version, groups, aggregates) in [
             (0, groups_none, aggregates_v0),
@@ -1989,13 +1992,12 @@ mod tests {
 
         let groups = PhysicalGroupBy::default();
 
-        let aggregates: Vec<AggregateFunctionExpr> =
-            vec![
-                AggregateExprBuilder::new(avg_udaf(), vec![col("a", &schema)?])
-                    .schema(Arc::clone(&schema))
-                    .alias("AVG(a)")
-                    .build()?,
-            ];
+        let aggregates: Vec<Arc<AggregateFunctionExpr>> = vec![Arc::new(
+            AggregateExprBuilder::new(avg_udaf(), vec![col("a", &schema)?])
+                .schema(Arc::clone(&schema))
+                .alias("AVG(a)")
+                .build()?,
+        )];
 
         let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 
1));
         let refs = blocking_exec.refs();
@@ -2029,13 +2031,12 @@ mod tests {
         let groups =
             PhysicalGroupBy::new_single(vec![(col("a", &schema)?, 
"a".to_string())]);
 
-        let aggregates: Vec<AggregateFunctionExpr> =
-            vec![
-                AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?])
-                    .schema(Arc::clone(&schema))
-                    .alias("AVG(b)")
-                    .build()?,
-            ];
+        let aggregates: Vec<Arc<AggregateFunctionExpr>> = vec![Arc::new(
+            AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?])
+                .schema(Arc::clone(&schema))
+                .alias("AVG(b)")
+                .build()?,
+        )];
 
         let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 
1));
         let refs = blocking_exec.refs();
@@ -2080,7 +2081,7 @@ mod tests {
     fn test_first_value_agg_expr(
         schema: &Schema,
         sort_options: SortOptions,
-    ) -> Result<AggregateFunctionExpr> {
+    ) -> Result<Arc<AggregateFunctionExpr>> {
         let ordering_req = [PhysicalSortExpr {
             expr: col("b", schema)?,
             options: sort_options,
@@ -2092,13 +2093,14 @@ mod tests {
             .schema(Arc::new(schema.clone()))
             .alias(String::from("first_value(b) ORDER BY [b ASC NULLS LAST]"))
             .build()
+            .map(Arc::new)
     }
 
     // LAST_VALUE(b ORDER BY b <SortOptions>)
     fn test_last_value_agg_expr(
         schema: &Schema,
         sort_options: SortOptions,
-    ) -> Result<AggregateFunctionExpr> {
+    ) -> Result<Arc<AggregateFunctionExpr>> {
         let ordering_req = [PhysicalSortExpr {
             expr: col("b", schema)?,
             options: sort_options,
@@ -2109,6 +2111,7 @@ mod tests {
             .schema(Arc::new(schema.clone()))
             .alias(String::from("last_value(b) ORDER BY [b ASC NULLS LAST]"))
             .build()
+            .map(Arc::new)
     }
 
     // This function either constructs the physical plan below,
@@ -2153,7 +2156,7 @@ mod tests {
             descending: false,
             nulls_first: false,
         };
-        let aggregates: Vec<AggregateFunctionExpr> = if is_first_acc {
+        let aggregates: Vec<Arc<AggregateFunctionExpr>> = if is_first_acc {
             vec![test_first_value_agg_expr(&schema, sort_options)?]
         } else {
             vec![test_last_value_agg_expr(&schema, sort_options)?]
@@ -2289,6 +2292,7 @@ mod tests {
                     .order_by(ordering_req.to_vec())
                     .schema(Arc::clone(&test_schema))
                     .build()
+                    .map(Arc::new)
                     .unwrap()
             })
             .collect::<Vec<_>>();
@@ -2318,7 +2322,7 @@ mod tests {
         };
         let groups = PhysicalGroupBy::new_single(vec![(col_a, 
"a".to_string())]);
 
-        let aggregates: Vec<AggregateFunctionExpr> = vec![
+        let aggregates: Vec<Arc<AggregateFunctionExpr>> = vec![
             test_first_value_agg_expr(&schema, option_desc)?,
             test_last_value_agg_expr(&schema, option_desc)?,
         ];
@@ -2376,11 +2380,12 @@ mod tests {
             ],
         );
 
-        let aggregates: Vec<AggregateFunctionExpr> =
+        let aggregates: Vec<Arc<AggregateFunctionExpr>> =
             vec![AggregateExprBuilder::new(count_udaf(), vec![lit(1)])
                 .schema(Arc::clone(&schema))
                 .alias("1")
-                .build()?];
+                .build()
+                .map(Arc::new)?];
 
         let input_batches = (0..4)
             .map(|_| {
@@ -2512,7 +2517,8 @@ mod tests {
         )
         .schema(Arc::clone(&batch.schema()))
         .alias(String::from("SUM(value)"))
-        .build()?];
+        .build()
+        .map(Arc::new)?];
 
         let input = Arc::new(MemoryExec::try_new(
             &[vec![batch.clone()]],
@@ -2560,7 +2566,8 @@ mod tests {
                 AggregateExprBuilder::new(count_udaf(), vec![col("val", 
&schema)?])
                     .schema(Arc::clone(&schema))
                     .alias(String::from("COUNT(val)"))
-                    .build()?,
+                    .build()
+                    .map(Arc::new)?,
             ];
 
         let input_data = vec![
@@ -2641,7 +2648,8 @@ mod tests {
                 AggregateExprBuilder::new(count_udaf(), vec![col("val", 
&schema)?])
                     .schema(Arc::clone(&schema))
                     .alias(String::from("COUNT(val)"))
-                    .build()?,
+                    .build()
+                    .map(Arc::new)?,
             ];
 
         let input_data = vec![
@@ -2728,7 +2736,8 @@ mod tests {
                 AggregateExprBuilder::new(count_udaf(), vec![col("a", 
&input_schema)?])
                     .schema(Arc::clone(&input_schema))
                     .alias("COUNT(a)")
-                    .build()?,
+                    .build()
+                    .map(Arc::new)?,
             ];
 
         let grouping_set = PhysicalGroupBy::new(
diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs 
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index 624844b6b9..7d21cc2f19 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -591,7 +591,7 @@ impl GroupedHashAggregateStream {
 /// that is supported by the aggregate, or a
 /// [`GroupsAccumulatorAdapter`] if not.
 pub(crate) fn create_group_accumulator(
-    agg_expr: &AggregateFunctionExpr,
+    agg_expr: &Arc<AggregateFunctionExpr>,
 ) -> Result<Box<dyn GroupsAccumulator>> {
     if agg_expr.groups_accumulator_supported() {
         agg_expr.create_groups_accumulator()
@@ -601,7 +601,7 @@ pub(crate) fn create_group_accumulator(
             "Creating GroupsAccumulatorAdapter for {}: {agg_expr:?}",
             agg_expr.name()
         );
-        let agg_expr_captured = agg_expr.clone();
+        let agg_expr_captured = Arc::clone(agg_expr);
         let factory = move || agg_expr_captured.create_accumulator();
         Ok(Box::new(GroupsAccumulatorAdapter::new(factory)))
     }
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index adf61f27bc..f6902fcbe2 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -119,7 +119,8 @@ pub fn create_window_expr(
                 .schema(Arc::new(input_schema.clone()))
                 .alias(name)
                 .with_ignore_nulls(ignore_nulls)
-                .build()?;
+                .build()
+                .map(Arc::new)?;
             window_expr_from_aggregate_expr(
                 partition_by,
                 order_by,
@@ -142,7 +143,7 @@ fn window_expr_from_aggregate_expr(
     partition_by: &[Arc<dyn PhysicalExpr>],
     order_by: &[PhysicalSortExpr],
     window_frame: Arc<WindowFrame>,
-    aggregate: AggregateFunctionExpr,
+    aggregate: Arc<AggregateFunctionExpr>,
 ) -> Arc<dyn WindowExpr> {
     // Is there a potentially unlimited sized window frame?
     let unbounded_window = window_frame.start_bound.is_unbounded();
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 9a6850cb21..634ae284c9 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -488,7 +488,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                     })
                     .collect::<Result<Vec<_>, _>>()?;
 
-                let physical_aggr_expr: Vec<AggregateFunctionExpr> = hash_agg
+                let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = 
hash_agg
                     .aggr_expr
                     .iter()
                     .zip(hash_agg.aggr_expr_name.iter())
@@ -518,6 +518,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                                                 
.with_distinct(agg_node.distinct)
                                                 .order_by(ordering_req)
                                                 .build()
+                                                .map(Arc::new)
                                         }
                                     }
                                 }).transpose()?.ok_or_else(|| {
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index 6072baca68..33eca07231 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -48,7 +48,7 @@ use crate::protobuf::{
 use super::PhysicalExtensionCodec;
 
 pub fn serialize_physical_aggr_expr(
-    aggr_expr: AggregateFunctionExpr,
+    aggr_expr: Arc<AggregateFunctionExpr>,
     codec: &dyn PhysicalExtensionCodec,
 ) -> Result<protobuf::PhysicalExprNode> {
     let expressions = serialize_physical_exprs(&aggr_expr.expressions(), 
codec)?;
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 025676f790..4a9bf6afb4 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -73,7 +73,6 @@ use 
datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
 use datafusion::physical_plan::projection::ProjectionExec;
 use datafusion::physical_plan::repartition::RepartitionExec;
 use datafusion::physical_plan::sorts::sort::SortExec;
-use datafusion::physical_plan::udaf::AggregateFunctionExpr;
 use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
 use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
 use datafusion::physical_plan::windows::{
@@ -305,7 +304,8 @@ fn roundtrip_window() -> Result<()> {
         )
         .schema(Arc::clone(&schema))
         .alias("avg(b)")
-        .build()?,
+        .build()
+        .map(Arc::new)?,
         &[],
         &[],
         Arc::new(WindowFrame::new(None)),
@@ -321,7 +321,8 @@ fn roundtrip_window() -> Result<()> {
     let sum_expr = AggregateExprBuilder::new(sum_udaf(), args)
         .schema(Arc::clone(&schema))
         .alias("SUM(a) RANGE BETWEEN CURRENT ROW AND UNBOUNDED PRECEEDING")
-        .build()?;
+        .build()
+        .map(Arc::new)?;
 
     let sliding_aggr_window_expr = Arc::new(SlidingAggregateWindowExpr::new(
         sum_expr,
@@ -367,13 +368,13 @@ fn rountrip_aggregate() -> Result<()> {
             .alias("NTH_VALUE(b, 1)")
             .build()?;
 
-    let test_cases: Vec<Vec<AggregateFunctionExpr>> = vec![
+    let test_cases = vec![
         // AVG
-        vec![avg_expr],
+        vec![Arc::new(avg_expr)],
         // NTH_VALUE
-        vec![nth_expr],
+        vec![Arc::new(nth_expr)],
         // STRING_AGG
-        vec![str_agg_expr],
+        vec![Arc::new(str_agg_expr)],
     ];
 
     for aggregates in test_cases {
@@ -400,12 +401,13 @@ fn rountrip_aggregate_with_limit() -> Result<()> {
     let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
         vec![(col("a", &schema)?, "unused".to_string())];
 
-    let aggregates: Vec<AggregateFunctionExpr> =
+    let aggregates =
         vec![
             AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?])
                 .schema(Arc::clone(&schema))
                 .alias("AVG(b)")
-                .build()?,
+                .build()
+                .map(Arc::new)?,
         ];
 
     let agg = AggregateExec::try_new(
@@ -429,13 +431,14 @@ fn rountrip_aggregate_with_approx_pencentile_cont() -> 
Result<()> {
     let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
         vec![(col("a", &schema)?, "unused".to_string())];
 
-    let aggregates: Vec<AggregateFunctionExpr> = 
vec![AggregateExprBuilder::new(
+    let aggregates = vec![AggregateExprBuilder::new(
         approx_percentile_cont_udaf(),
         vec![col("b", &schema)?, lit(0.5)],
     )
     .schema(Arc::clone(&schema))
     .alias("APPROX_PERCENTILE_CONT(b, 0.5)")
-    .build()?];
+    .build()
+    .map(Arc::new)?];
 
     let agg = AggregateExec::try_new(
         AggregateMode::Final,
@@ -464,13 +467,14 @@ fn rountrip_aggregate_with_sort() -> Result<()> {
         },
     }];
 
-    let aggregates: Vec<AggregateFunctionExpr> =
+    let aggregates =
         vec![
             AggregateExprBuilder::new(array_agg_udaf(), vec![col("b", 
&schema)?])
                 .schema(Arc::clone(&schema))
                 .alias("ARRAY_AGG(b)")
                 .order_by(sort_exprs)
-                .build()?,
+                .build()
+                .map(Arc::new)?,
         ];
 
     let agg = AggregateExec::try_new(
@@ -531,12 +535,13 @@ fn roundtrip_aggregate_udaf() -> Result<()> {
     let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
         vec![(col("a", &schema)?, "unused".to_string())];
 
-    let aggregates: Vec<AggregateFunctionExpr> =
+    let aggregates =
         vec![
             AggregateExprBuilder::new(Arc::new(udaf), vec![col("b", &schema)?])
                 .schema(Arc::clone(&schema))
                 .alias("example_agg")
-                .build()?,
+                .build()
+                .map(Arc::new)?,
         ];
 
     roundtrip_test_with_context(
@@ -1001,7 +1006,8 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> {
         AggregateExprBuilder::new(max_udaf(), vec![udf_expr as Arc<dyn 
PhysicalExpr>])
             .schema(schema.clone())
             .alias("max")
-            .build()?;
+            .build()
+            .map(Arc::new)?;
 
     let window = Arc::new(WindowAggExec::try_new(
         vec![Arc::new(PlainAggregateWindowExpr::new(
@@ -1052,7 +1058,8 @@ fn roundtrip_aggregate_udf_extension_codec() -> 
Result<()> {
     let aggr_expr = AggregateExprBuilder::new(Arc::clone(&udaf), 
aggr_args.clone())
         .schema(Arc::clone(&schema))
         .alias("aggregate_udf")
-        .build()?;
+        .build()
+        .map(Arc::new)?;
 
     let filter = Arc::new(FilterExec::try_new(
         Arc::new(BinaryExpr::new(
@@ -1079,7 +1086,8 @@ fn roundtrip_aggregate_udf_extension_codec() -> 
Result<()> {
         .alias("aggregate_udf")
         .distinct()
         .ignore_nulls()
-        .build()?;
+        .build()
+        .map(Arc::new)?;
 
     let aggregate = Arc::new(AggregateExec::try_new(
         AggregateMode::Final,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to