This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 10af8a7366 Improve performance for physical plan creation with many
columns (#12950)
10af8a7366 is described below
commit 10af8a73662f4f6aac09a34157b7cf5fee034502
Author: Albert Skalt <[email protected]>
AuthorDate: Fri Oct 18 23:41:53 2024 +0300
Improve performance for physical plan creation with many columns (#12950)
* Add a benchmark for physical plan creation with many aggregates
* Wrap AggregateFunctionExpr with Arc
Patch f5c47fa274d53c1d524a1fb788d9a063bf5240ef removed Arc wrappers for
AggregateFunctionExpr.
But, it can be inefficient. When physical optimizer decides to replace a
node child to other,
it clones the node (with `with_new_children`). Assume, that node is
`AggregateExec` than contains
hundreds aggregates and these aggregates are cloned each time.
This patch returns a Arc wrapping to not clone AggregateFunctionExpr itself
but clone a pointer.
* Do not build mapping if parent does not require any
This patch adds a small optimization that can soft the edges on
some queries. If there are no parent requirements we do not need to
build column mapping.
---
datafusion/core/benches/sql_planner.rs | 14 +++
.../src/physical_optimizer/update_aggr_exprs.rs | 10 +-
datafusion/core/src/physical_planner.rs | 5 +-
datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs | 1 +
.../combine_partial_final_agg.rs | 8 +-
.../limited_distinct_aggregation.rs | 16 +--
datafusion/physical-expr/src/aggregate.rs | 2 +-
datafusion/physical-expr/src/utils/mod.rs | 4 +
datafusion/physical-expr/src/window/aggregate.rs | 8 +-
.../physical-expr/src/window/sliding_aggregate.rs | 13 ++-
.../physical-optimizer/src/aggregate_statistics.rs | 24 ++---
.../src/combine_partial_final_agg.rs | 2 +-
datafusion/physical-plan/src/aggregates/mod.rs | 119 +++++++++++----------
.../physical-plan/src/aggregates/row_hash.rs | 4 +-
datafusion/physical-plan/src/windows/mod.rs | 5 +-
datafusion/proto/src/physical_plan/mod.rs | 3 +-
datafusion/proto/src/physical_plan/to_proto.rs | 2 +-
.../proto/tests/cases/roundtrip_physical_plan.rs | 44 ++++----
18 files changed, 165 insertions(+), 119 deletions(-)
diff --git a/datafusion/core/benches/sql_planner.rs
b/datafusion/core/benches/sql_planner.rs
index 00f6d59167..e7c35c8d86 100644
--- a/datafusion/core/benches/sql_planner.rs
+++ b/datafusion/core/benches/sql_planner.rs
@@ -144,6 +144,20 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});
+ c.bench_function("physical_select_aggregates_from_200", |b| {
+ let mut aggregates = String::new();
+ for i in 0..200 {
+ if i > 0 {
+ aggregates.push_str(", ");
+ }
+ aggregates.push_str(format!("MAX(a{})", i).as_str());
+ }
+ let query = format!("SELECT {} FROM t1", aggregates);
+ b.iter(|| {
+ physical_plan(&ctx, &query);
+ });
+ });
+
// --- TPC-H ---
let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas());
diff --git a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
index c0d9140c02..26cdd65883 100644
--- a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
+++ b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
@@ -131,10 +131,10 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder {
/// successfully. Any errors occurring during the conversion process are
/// passed through.
fn try_convert_aggregate_if_better(
- aggr_exprs: Vec<AggregateFunctionExpr>,
+ aggr_exprs: Vec<Arc<AggregateFunctionExpr>>,
prefix_requirement: &[PhysicalSortRequirement],
eq_properties: &EquivalenceProperties,
-) -> Result<Vec<AggregateFunctionExpr>> {
+) -> Result<Vec<Arc<AggregateFunctionExpr>>> {
aggr_exprs
.into_iter()
.map(|aggr_expr| {
@@ -154,7 +154,7 @@ fn try_convert_aggregate_if_better(
let reqs = concat_slices(prefix_requirement, &aggr_sort_reqs);
if eq_properties.ordering_satisfy_requirement(&reqs) {
// Existing ordering satisfies the aggregator requirements:
- aggr_expr.with_beneficial_ordering(true)?
+ aggr_expr.with_beneficial_ordering(true)?.map(Arc::new)
} else if
eq_properties.ordering_satisfy_requirement(&concat_slices(
prefix_requirement,
&reverse_aggr_req,
@@ -163,12 +163,14 @@ fn try_convert_aggregate_if_better(
// given the existing ordering (if possible):
aggr_expr
.reverse_expr()
+ .map(Arc::new)
.unwrap_or(aggr_expr)
.with_beneficial_ordering(true)?
+ .map(Arc::new)
} else {
// There is no beneficial ordering present -- aggregation
// will still work albeit in a less efficient mode.
- aggr_expr.with_beneficial_ordering(false)?
+ aggr_expr.with_beneficial_ordering(false)?.map(Arc::new)
}
.ok_or_else(|| {
plan_datafusion_err!(
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index cf2a157b04..a4dffd3d02 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1523,7 +1523,7 @@ pub fn create_window_expr(
}
type AggregateExprWithOptionalArgs = (
- AggregateFunctionExpr,
+ Arc<AggregateFunctionExpr>,
// The filter clause, if any
Option<Arc<dyn PhysicalExpr>>,
// Ordering requirements, if any
@@ -1587,7 +1587,8 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
.alias(name)
.with_ignore_nulls(ignore_nulls)
.with_distinct(*distinct)
- .build()?;
+ .build()
+ .map(Arc::new)?;
(agg_expr, filter, physical_sort_exprs)
};
diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
index 34061a64d7..ff51282933 100644
--- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
@@ -405,6 +405,7 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>,
group_by_columns: Vec<&str
.schema(Arc::clone(&schema))
.alias("sum1")
.build()
+ .map(Arc::new)
.unwrap(),
];
let expr = group_by_columns
diff --git
a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs
b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs
index 24e46b3ad9..85076abdaf 100644
--- a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs
@@ -84,7 +84,7 @@ fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
fn partial_aggregate_exec(
input: Arc<dyn ExecutionPlan>,
group_by: PhysicalGroupBy,
- aggr_expr: Vec<AggregateFunctionExpr>,
+ aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
) -> Arc<dyn ExecutionPlan> {
let schema = input.schema();
let n_aggr = aggr_expr.len();
@@ -104,7 +104,7 @@ fn partial_aggregate_exec(
fn final_aggregate_exec(
input: Arc<dyn ExecutionPlan>,
group_by: PhysicalGroupBy,
- aggr_expr: Vec<AggregateFunctionExpr>,
+ aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
) -> Arc<dyn ExecutionPlan> {
let schema = input.schema();
let n_aggr = aggr_expr.len();
@@ -130,11 +130,12 @@ fn count_expr(
expr: Arc<dyn PhysicalExpr>,
name: &str,
schema: &Schema,
-) -> AggregateFunctionExpr {
+) -> Arc<AggregateFunctionExpr> {
AggregateExprBuilder::new(count_udaf(), vec![expr])
.schema(Arc::new(schema.clone()))
.alias(name)
.build()
+ .map(Arc::new)
.unwrap()
}
@@ -218,6 +219,7 @@ fn aggregations_with_group_combined() ->
datafusion_common::Result<()> {
.schema(Arc::clone(&schema))
.alias("Sum(b)")
.build()
+ .map(Arc::new)
.unwrap(),
];
let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
diff --git
a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs
b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs
index 042f6d6225..d6991711f5 100644
--- a/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs
+++ b/datafusion/core/tests/physical_optimizer/limited_distinct_aggregation.rs
@@ -347,10 +347,10 @@ fn test_has_aggregate_expression() -> Result<()> {
let single_agg = AggregateExec::try_new(
AggregateMode::Single,
build_group_by(&schema, vec!["a".to_string()]),
- vec![agg.count_expr(&schema)], /* aggr_expr */
- vec![None], /* filter_expr */
- source, /* input */
- schema.clone(), /* input_schema */
+ vec![Arc::new(agg.count_expr(&schema))], /* aggr_expr */
+ vec![None], /* filter_expr */
+ source, /* input */
+ schema.clone(), /* input_schema */
)?;
let limit_exec = LocalLimitExec::new(
Arc::new(single_agg),
@@ -384,10 +384,10 @@ fn test_has_filter() -> Result<()> {
let single_agg = AggregateExec::try_new(
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string()]),
- vec![agg.count_expr(&schema)], /* aggr_expr */
- vec![filter_expr], /* filter_expr */
- source, /* input */
- schema.clone(), /* input_schema */
+ vec![Arc::new(agg.count_expr(&schema))], /* aggr_expr */
+ vec![filter_expr], /* filter_expr */
+ source, /* input */
+ schema.clone(), /* input_schema */
)?;
let limit_exec = LocalLimitExec::new(
Arc::new(single_agg),
diff --git a/datafusion/physical-expr/src/aggregate.rs
b/datafusion/physical-expr/src/aggregate.rs
index 866596d0b6..6330c24024 100644
--- a/datafusion/physical-expr/src/aggregate.rs
+++ b/datafusion/physical-expr/src/aggregate.rs
@@ -328,7 +328,7 @@ impl AggregateFunctionExpr {
/// not implement the method, returns an error. Order insensitive and hard
/// requirement aggregators return `Ok(None)`.
pub fn with_beneficial_ordering(
- self,
+ self: Arc<Self>,
beneficial_ordering: bool,
) -> Result<Option<AggregateFunctionExpr>> {
let Some(updated_fn) = self
diff --git a/datafusion/physical-expr/src/utils/mod.rs
b/datafusion/physical-expr/src/utils/mod.rs
index 4c37db4849..4bd022975a 100644
--- a/datafusion/physical-expr/src/utils/mod.rs
+++ b/datafusion/physical-expr/src/utils/mod.rs
@@ -86,6 +86,10 @@ pub fn map_columns_before_projection(
parent_required: &[Arc<dyn PhysicalExpr>],
proj_exprs: &[(Arc<dyn PhysicalExpr>, String)],
) -> Vec<Arc<dyn PhysicalExpr>> {
+ if parent_required.is_empty() {
+ // No need to build mapping.
+ return vec![];
+ }
let column_mapping = proj_exprs
.iter()
.filter_map(|(expr, name)| {
diff --git a/datafusion/physical-expr/src/window/aggregate.rs
b/datafusion/physical-expr/src/window/aggregate.rs
index d012fef93b..3fe5d842df 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -41,7 +41,7 @@ use crate::{expressions::PhysicalSortExpr, reverse_order_bys,
PhysicalExpr};
/// See comments on [`WindowExpr`] for more details.
#[derive(Debug)]
pub struct PlainAggregateWindowExpr {
- aggregate: AggregateFunctionExpr,
+ aggregate: Arc<AggregateFunctionExpr>,
partition_by: Vec<Arc<dyn PhysicalExpr>>,
order_by: Vec<PhysicalSortExpr>,
window_frame: Arc<WindowFrame>,
@@ -50,7 +50,7 @@ pub struct PlainAggregateWindowExpr {
impl PlainAggregateWindowExpr {
/// Create a new aggregate window function expression
pub fn new(
- aggregate: AggregateFunctionExpr,
+ aggregate: Arc<AggregateFunctionExpr>,
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: &[PhysicalSortExpr],
window_frame: Arc<WindowFrame>,
@@ -137,14 +137,14 @@ impl WindowExpr for PlainAggregateWindowExpr {
let reverse_window_frame = self.window_frame.reverse();
if reverse_window_frame.start_bound.is_unbounded() {
Arc::new(PlainAggregateWindowExpr::new(
- reverse_expr,
+ Arc::new(reverse_expr),
&self.partition_by.clone(),
&reverse_order_bys(&self.order_by),
Arc::new(self.window_frame.reverse()),
)) as _
} else {
Arc::new(SlidingAggregateWindowExpr::new(
- reverse_expr,
+ Arc::new(reverse_expr),
&self.partition_by.clone(),
&reverse_order_bys(&self.order_by),
Arc::new(self.window_frame.reverse()),
diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs
b/datafusion/physical-expr/src/window/sliding_aggregate.rs
index 143d59eb44..b889ec8c5d 100644
--- a/datafusion/physical-expr/src/window/sliding_aggregate.rs
+++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs
@@ -41,7 +41,7 @@ use crate::{expressions::PhysicalSortExpr, reverse_order_bys,
PhysicalExpr};
/// See comments on [`WindowExpr`] for more details.
#[derive(Debug)]
pub struct SlidingAggregateWindowExpr {
- aggregate: AggregateFunctionExpr,
+ aggregate: Arc<AggregateFunctionExpr>,
partition_by: Vec<Arc<dyn PhysicalExpr>>,
order_by: Vec<PhysicalSortExpr>,
window_frame: Arc<WindowFrame>,
@@ -50,7 +50,7 @@ pub struct SlidingAggregateWindowExpr {
impl SlidingAggregateWindowExpr {
/// Create a new (sliding) aggregate window function expression.
pub fn new(
- aggregate: AggregateFunctionExpr,
+ aggregate: Arc<AggregateFunctionExpr>,
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: &[PhysicalSortExpr],
window_frame: Arc<WindowFrame>,
@@ -121,14 +121,14 @@ impl WindowExpr for SlidingAggregateWindowExpr {
let reverse_window_frame = self.window_frame.reverse();
if reverse_window_frame.start_bound.is_unbounded() {
Arc::new(PlainAggregateWindowExpr::new(
- reverse_expr,
+ Arc::new(reverse_expr),
&self.partition_by.clone(),
&reverse_order_bys(&self.order_by),
Arc::new(self.window_frame.reverse()),
)) as _
} else {
Arc::new(SlidingAggregateWindowExpr::new(
- reverse_expr,
+ Arc::new(reverse_expr),
&self.partition_by.clone(),
&reverse_order_bys(&self.order_by),
Arc::new(self.window_frame.reverse()),
@@ -159,7 +159,10 @@ impl WindowExpr for SlidingAggregateWindowExpr {
})
.collect::<Vec<_>>();
Some(Arc::new(SlidingAggregateWindowExpr {
- aggregate: self.aggregate.with_new_expressions(args, vec![])?,
+ aggregate: self
+ .aggregate
+ .with_new_expressions(args, vec![])
+ .map(Arc::new)?,
partition_by: partition_bys,
order_by: new_order_by,
window_frame: Arc::clone(&self.window_frame),
diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs
b/datafusion/physical-optimizer/src/aggregate_statistics.rs
index fd21362fd3..27870c7865 100644
--- a/datafusion/physical-optimizer/src/aggregate_statistics.rs
+++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs
@@ -312,7 +312,7 @@ mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
- vec![agg.count_expr(&schema)],
+ vec![Arc::new(agg.count_expr(&schema))],
vec![None],
source,
Arc::clone(&schema),
@@ -321,7 +321,7 @@ mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
- vec![agg.count_expr(&schema)],
+ vec![Arc::new(agg.count_expr(&schema))],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
@@ -342,7 +342,7 @@ mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
- vec![agg.count_expr(&schema)],
+ vec![Arc::new(agg.count_expr(&schema))],
vec![None],
source,
Arc::clone(&schema),
@@ -351,7 +351,7 @@ mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
- vec![agg.count_expr(&schema)],
+ vec![Arc::new(agg.count_expr(&schema))],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
@@ -371,7 +371,7 @@ mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
- vec![agg.count_expr(&schema)],
+ vec![Arc::new(agg.count_expr(&schema))],
vec![None],
source,
Arc::clone(&schema),
@@ -383,7 +383,7 @@ mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
- vec![agg.count_expr(&schema)],
+ vec![Arc::new(agg.count_expr(&schema))],
vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
@@ -403,7 +403,7 @@ mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
- vec![agg.count_expr(&schema)],
+ vec![Arc::new(agg.count_expr(&schema))],
vec![None],
source,
Arc::clone(&schema),
@@ -415,7 +415,7 @@ mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
- vec![agg.count_expr(&schema)],
+ vec![Arc::new(agg.count_expr(&schema))],
vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
@@ -446,7 +446,7 @@ mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
- vec![agg.count_expr(&schema)],
+ vec![Arc::new(agg.count_expr(&schema))],
vec![None],
filter,
Arc::clone(&schema),
@@ -455,7 +455,7 @@ mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
- vec![agg.count_expr(&schema)],
+ vec![Arc::new(agg.count_expr(&schema))],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
@@ -491,7 +491,7 @@ mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
- vec![agg.count_expr(&schema)],
+ vec![Arc::new(agg.count_expr(&schema))],
vec![None],
filter,
Arc::clone(&schema),
@@ -500,7 +500,7 @@ mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
- vec![agg.count_expr(&schema)],
+ vec![Arc::new(agg.count_expr(&schema))],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
diff --git a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
index 4e352e25b5..86f7e73e9e 100644
--- a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
+++ b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs
@@ -125,7 +125,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate
{
type GroupExprsRef<'a> = (
&'a PhysicalGroupBy,
- &'a [AggregateFunctionExpr],
+ &'a [Arc<AggregateFunctionExpr>],
&'a [Option<Arc<dyn PhysicalExpr>>],
);
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index 296c5811e5..f36bd920e8 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -351,7 +351,7 @@ pub struct AggregateExec {
/// Group by expressions
group_by: PhysicalGroupBy,
/// Aggregate expressions
- aggr_expr: Vec<AggregateFunctionExpr>,
+ aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
/// FILTER (WHERE clause) expression for each aggregate expression
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
/// Set if the output of this aggregation is truncated by a upstream
sort/limit clause
@@ -378,7 +378,10 @@ impl AggregateExec {
/// Function used in `OptimizeAggregateOrder` optimizer rule,
/// where we need parts of the new value, others cloned from the old one
/// Rewrites aggregate exec with new aggregate expressions.
- pub fn with_new_aggr_exprs(&self, aggr_expr: Vec<AggregateFunctionExpr>)
-> Self {
+ pub fn with_new_aggr_exprs(
+ &self,
+ aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
+ ) -> Self {
Self {
aggr_expr,
// clone the rest of the fields
@@ -404,7 +407,7 @@ impl AggregateExec {
pub fn try_new(
mode: AggregateMode,
group_by: PhysicalGroupBy,
- aggr_expr: Vec<AggregateFunctionExpr>,
+ aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
@@ -435,7 +438,7 @@ impl AggregateExec {
fn try_new_with_schema(
mode: AggregateMode,
group_by: PhysicalGroupBy,
- mut aggr_expr: Vec<AggregateFunctionExpr>,
+ mut aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
@@ -545,7 +548,7 @@ impl AggregateExec {
}
/// Aggregate expressions
- pub fn aggr_expr(&self) -> &[AggregateFunctionExpr] {
+ pub fn aggr_expr(&self) -> &[Arc<AggregateFunctionExpr>] {
&self.aggr_expr
}
@@ -876,7 +879,7 @@ impl ExecutionPlan for AggregateExec {
fn create_schema(
input_schema: &Schema,
group_by: &PhysicalGroupBy,
- aggr_expr: &[AggregateFunctionExpr],
+ aggr_expr: &[Arc<AggregateFunctionExpr>],
mode: AggregateMode,
) -> Result<Schema> {
let mut fields = Vec::with_capacity(group_by.num_output_exprs() +
aggr_expr.len());
@@ -1006,7 +1009,7 @@ pub fn concat_slices<T: Clone>(lhs: &[T], rhs: &[T]) ->
Vec<T> {
/// A `LexRequirement` instance, which is the requirement that satisfies all
the
/// aggregate requirements. Returns an error in case of conflicting
requirements.
pub fn get_finer_aggregate_exprs_requirement(
- aggr_exprs: &mut [AggregateFunctionExpr],
+ aggr_exprs: &mut [Arc<AggregateFunctionExpr>],
group_by: &PhysicalGroupBy,
eq_properties: &EquivalenceProperties,
agg_mode: &AggregateMode,
@@ -1034,7 +1037,7 @@ pub fn get_finer_aggregate_exprs_requirement(
// Reverse requirement is satisfied by exiting ordering.
// Hence reverse the aggregator
requirement = finer_ordering;
- *aggr_expr = reverse_aggr_expr;
+ *aggr_expr = Arc::new(reverse_aggr_expr);
continue;
}
}
@@ -1058,7 +1061,7 @@ pub fn get_finer_aggregate_exprs_requirement(
// There is a requirement that both satisfies existing
requirement and reverse
// aggregate requirement. Use updated requirement
requirement = finer_ordering;
- *aggr_expr = reverse_aggr_expr;
+ *aggr_expr = Arc::new(reverse_aggr_expr);
continue;
}
}
@@ -1080,7 +1083,7 @@ pub fn get_finer_aggregate_exprs_requirement(
/// * Partial: AggregateFunctionExpr::expressions
/// * Final: columns of `AggregateFunctionExpr::state_fields()`
pub fn aggregate_expressions(
- aggr_expr: &[AggregateFunctionExpr],
+ aggr_expr: &[Arc<AggregateFunctionExpr>],
mode: &AggregateMode,
col_idx_base: usize,
) -> Result<Vec<Vec<Arc<dyn PhysicalExpr>>>> {
@@ -1135,7 +1138,7 @@ fn merge_expressions(
pub type AccumulatorItem = Box<dyn Accumulator>;
pub fn create_accumulators(
- aggr_expr: &[AggregateFunctionExpr],
+ aggr_expr: &[Arc<AggregateFunctionExpr>],
) -> Result<Vec<AccumulatorItem>> {
aggr_expr
.iter()
@@ -1458,10 +1461,12 @@ mod tests {
],
);
- let aggregates = vec![AggregateExprBuilder::new(count_udaf(),
vec![lit(1i8)])
- .schema(Arc::clone(&input_schema))
- .alias("COUNT(1)")
- .build()?];
+ let aggregates = vec![Arc::new(
+ AggregateExprBuilder::new(count_udaf(), vec![lit(1i8)])
+ .schema(Arc::clone(&input_schema))
+ .alias("COUNT(1)")
+ .build()?,
+ )];
let task_ctx = if spill {
// adjust the max memory size to have the partial aggregate result
for spill mode.
@@ -1596,13 +1601,12 @@ mod tests {
vec![vec![false]],
);
- let aggregates: Vec<AggregateFunctionExpr> =
- vec![
- AggregateExprBuilder::new(avg_udaf(), vec![col("b",
&input_schema)?])
- .schema(Arc::clone(&input_schema))
- .alias("AVG(b)")
- .build()?,
- ];
+ let aggregates: Vec<Arc<AggregateFunctionExpr>> = vec![Arc::new(
+ AggregateExprBuilder::new(avg_udaf(), vec![col("b",
&input_schema)?])
+ .schema(Arc::clone(&input_schema))
+ .alias("AVG(b)")
+ .build()?,
+ )];
let task_ctx = if spill {
// set to an appropriate value to trigger spill
@@ -1925,17 +1929,16 @@ mod tests {
);
// something that allocates within the aggregator
- let aggregates_v0: Vec<AggregateFunctionExpr> =
- vec![test_median_agg_expr(Arc::clone(&input_schema))?];
+ let aggregates_v0: Vec<Arc<AggregateFunctionExpr>> =
+ vec![Arc::new(test_median_agg_expr(Arc::clone(&input_schema))?)];
// use fast-path in `row_hash.rs`.
- let aggregates_v2: Vec<AggregateFunctionExpr> =
- vec![
- AggregateExprBuilder::new(avg_udaf(), vec![col("b",
&input_schema)?])
- .schema(Arc::clone(&input_schema))
- .alias("AVG(b)")
- .build()?,
- ];
+ let aggregates_v2: Vec<Arc<AggregateFunctionExpr>> = vec![Arc::new(
+ AggregateExprBuilder::new(avg_udaf(), vec![col("b",
&input_schema)?])
+ .schema(Arc::clone(&input_schema))
+ .alias("AVG(b)")
+ .build()?,
+ )];
for (version, groups, aggregates) in [
(0, groups_none, aggregates_v0),
@@ -1989,13 +1992,12 @@ mod tests {
let groups = PhysicalGroupBy::default();
- let aggregates: Vec<AggregateFunctionExpr> =
- vec![
- AggregateExprBuilder::new(avg_udaf(), vec![col("a", &schema)?])
- .schema(Arc::clone(&schema))
- .alias("AVG(a)")
- .build()?,
- ];
+ let aggregates: Vec<Arc<AggregateFunctionExpr>> = vec![Arc::new(
+ AggregateExprBuilder::new(avg_udaf(), vec![col("a", &schema)?])
+ .schema(Arc::clone(&schema))
+ .alias("AVG(a)")
+ .build()?,
+ )];
let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema),
1));
let refs = blocking_exec.refs();
@@ -2029,13 +2031,12 @@ mod tests {
let groups =
PhysicalGroupBy::new_single(vec![(col("a", &schema)?,
"a".to_string())]);
- let aggregates: Vec<AggregateFunctionExpr> =
- vec![
- AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?])
- .schema(Arc::clone(&schema))
- .alias("AVG(b)")
- .build()?,
- ];
+ let aggregates: Vec<Arc<AggregateFunctionExpr>> = vec![Arc::new(
+ AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?])
+ .schema(Arc::clone(&schema))
+ .alias("AVG(b)")
+ .build()?,
+ )];
let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema),
1));
let refs = blocking_exec.refs();
@@ -2080,7 +2081,7 @@ mod tests {
fn test_first_value_agg_expr(
schema: &Schema,
sort_options: SortOptions,
- ) -> Result<AggregateFunctionExpr> {
+ ) -> Result<Arc<AggregateFunctionExpr>> {
let ordering_req = [PhysicalSortExpr {
expr: col("b", schema)?,
options: sort_options,
@@ -2092,13 +2093,14 @@ mod tests {
.schema(Arc::new(schema.clone()))
.alias(String::from("first_value(b) ORDER BY [b ASC NULLS LAST]"))
.build()
+ .map(Arc::new)
}
// LAST_VALUE(b ORDER BY b <SortOptions>)
fn test_last_value_agg_expr(
schema: &Schema,
sort_options: SortOptions,
- ) -> Result<AggregateFunctionExpr> {
+ ) -> Result<Arc<AggregateFunctionExpr>> {
let ordering_req = [PhysicalSortExpr {
expr: col("b", schema)?,
options: sort_options,
@@ -2109,6 +2111,7 @@ mod tests {
.schema(Arc::new(schema.clone()))
.alias(String::from("last_value(b) ORDER BY [b ASC NULLS LAST]"))
.build()
+ .map(Arc::new)
}
// This function either constructs the physical plan below,
@@ -2153,7 +2156,7 @@ mod tests {
descending: false,
nulls_first: false,
};
- let aggregates: Vec<AggregateFunctionExpr> = if is_first_acc {
+ let aggregates: Vec<Arc<AggregateFunctionExpr>> = if is_first_acc {
vec![test_first_value_agg_expr(&schema, sort_options)?]
} else {
vec![test_last_value_agg_expr(&schema, sort_options)?]
@@ -2289,6 +2292,7 @@ mod tests {
.order_by(ordering_req.to_vec())
.schema(Arc::clone(&test_schema))
.build()
+ .map(Arc::new)
.unwrap()
})
.collect::<Vec<_>>();
@@ -2318,7 +2322,7 @@ mod tests {
};
let groups = PhysicalGroupBy::new_single(vec![(col_a,
"a".to_string())]);
- let aggregates: Vec<AggregateFunctionExpr> = vec![
+ let aggregates: Vec<Arc<AggregateFunctionExpr>> = vec![
test_first_value_agg_expr(&schema, option_desc)?,
test_last_value_agg_expr(&schema, option_desc)?,
];
@@ -2376,11 +2380,12 @@ mod tests {
],
);
- let aggregates: Vec<AggregateFunctionExpr> =
+ let aggregates: Vec<Arc<AggregateFunctionExpr>> =
vec![AggregateExprBuilder::new(count_udaf(), vec![lit(1)])
.schema(Arc::clone(&schema))
.alias("1")
- .build()?];
+ .build()
+ .map(Arc::new)?];
let input_batches = (0..4)
.map(|_| {
@@ -2512,7 +2517,8 @@ mod tests {
)
.schema(Arc::clone(&batch.schema()))
.alias(String::from("SUM(value)"))
- .build()?];
+ .build()
+ .map(Arc::new)?];
let input = Arc::new(MemoryExec::try_new(
&[vec![batch.clone()]],
@@ -2560,7 +2566,8 @@ mod tests {
AggregateExprBuilder::new(count_udaf(), vec![col("val",
&schema)?])
.schema(Arc::clone(&schema))
.alias(String::from("COUNT(val)"))
- .build()?,
+ .build()
+ .map(Arc::new)?,
];
let input_data = vec![
@@ -2641,7 +2648,8 @@ mod tests {
AggregateExprBuilder::new(count_udaf(), vec![col("val",
&schema)?])
.schema(Arc::clone(&schema))
.alias(String::from("COUNT(val)"))
- .build()?,
+ .build()
+ .map(Arc::new)?,
];
let input_data = vec![
@@ -2728,7 +2736,8 @@ mod tests {
AggregateExprBuilder::new(count_udaf(), vec![col("a",
&input_schema)?])
.schema(Arc::clone(&input_schema))
.alias("COUNT(a)")
- .build()?,
+ .build()
+ .map(Arc::new)?,
];
let grouping_set = PhysicalGroupBy::new(
diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index 624844b6b9..7d21cc2f19 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -591,7 +591,7 @@ impl GroupedHashAggregateStream {
/// that is supported by the aggregate, or a
/// [`GroupsAccumulatorAdapter`] if not.
pub(crate) fn create_group_accumulator(
- agg_expr: &AggregateFunctionExpr,
+ agg_expr: &Arc<AggregateFunctionExpr>,
) -> Result<Box<dyn GroupsAccumulator>> {
if agg_expr.groups_accumulator_supported() {
agg_expr.create_groups_accumulator()
@@ -601,7 +601,7 @@ pub(crate) fn create_group_accumulator(
"Creating GroupsAccumulatorAdapter for {}: {agg_expr:?}",
agg_expr.name()
);
- let agg_expr_captured = agg_expr.clone();
+ let agg_expr_captured = Arc::clone(agg_expr);
let factory = move || agg_expr_captured.create_accumulator();
Ok(Box::new(GroupsAccumulatorAdapter::new(factory)))
}
diff --git a/datafusion/physical-plan/src/windows/mod.rs
b/datafusion/physical-plan/src/windows/mod.rs
index adf61f27bc..f6902fcbe2 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -119,7 +119,8 @@ pub fn create_window_expr(
.schema(Arc::new(input_schema.clone()))
.alias(name)
.with_ignore_nulls(ignore_nulls)
- .build()?;
+ .build()
+ .map(Arc::new)?;
window_expr_from_aggregate_expr(
partition_by,
order_by,
@@ -142,7 +143,7 @@ fn window_expr_from_aggregate_expr(
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: &[PhysicalSortExpr],
window_frame: Arc<WindowFrame>,
- aggregate: AggregateFunctionExpr,
+ aggregate: Arc<AggregateFunctionExpr>,
) -> Arc<dyn WindowExpr> {
// Is there a potentially unlimited sized window frame?
let unbounded_window = window_frame.start_bound.is_unbounded();
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 9a6850cb21..634ae284c9 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -488,7 +488,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
})
.collect::<Result<Vec<_>, _>>()?;
- let physical_aggr_expr: Vec<AggregateFunctionExpr> = hash_agg
+ let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> =
hash_agg
.aggr_expr
.iter()
.zip(hash_agg.aggr_expr_name.iter())
@@ -518,6 +518,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.with_distinct(agg_node.distinct)
.order_by(ordering_req)
.build()
+ .map(Arc::new)
}
}
}).transpose()?.ok_or_else(|| {
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index 6072baca68..33eca07231 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -48,7 +48,7 @@ use crate::protobuf::{
use super::PhysicalExtensionCodec;
pub fn serialize_physical_aggr_expr(
- aggr_expr: AggregateFunctionExpr,
+ aggr_expr: Arc<AggregateFunctionExpr>,
codec: &dyn PhysicalExtensionCodec,
) -> Result<protobuf::PhysicalExprNode> {
let expressions = serialize_physical_exprs(&aggr_expr.expressions(),
codec)?;
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 025676f790..4a9bf6afb4 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -73,7 +73,6 @@ use
datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
-use datafusion::physical_plan::udaf::AggregateFunctionExpr;
use datafusion::physical_plan::union::{InterleaveExec, UnionExec};
use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
use datafusion::physical_plan::windows::{
@@ -305,7 +304,8 @@ fn roundtrip_window() -> Result<()> {
)
.schema(Arc::clone(&schema))
.alias("avg(b)")
- .build()?,
+ .build()
+ .map(Arc::new)?,
&[],
&[],
Arc::new(WindowFrame::new(None)),
@@ -321,7 +321,8 @@ fn roundtrip_window() -> Result<()> {
let sum_expr = AggregateExprBuilder::new(sum_udaf(), args)
.schema(Arc::clone(&schema))
.alias("SUM(a) RANGE BETWEEN CURRENT ROW AND UNBOUNDED PRECEEDING")
- .build()?;
+ .build()
+ .map(Arc::new)?;
let sliding_aggr_window_expr = Arc::new(SlidingAggregateWindowExpr::new(
sum_expr,
@@ -367,13 +368,13 @@ fn rountrip_aggregate() -> Result<()> {
.alias("NTH_VALUE(b, 1)")
.build()?;
- let test_cases: Vec<Vec<AggregateFunctionExpr>> = vec![
+ let test_cases = vec![
// AVG
- vec![avg_expr],
+ vec![Arc::new(avg_expr)],
// NTH_VALUE
- vec![nth_expr],
+ vec![Arc::new(nth_expr)],
// STRING_AGG
- vec![str_agg_expr],
+ vec![Arc::new(str_agg_expr)],
];
for aggregates in test_cases {
@@ -400,12 +401,13 @@ fn rountrip_aggregate_with_limit() -> Result<()> {
let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
vec![(col("a", &schema)?, "unused".to_string())];
- let aggregates: Vec<AggregateFunctionExpr> =
+ let aggregates =
vec![
AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?])
.schema(Arc::clone(&schema))
.alias("AVG(b)")
- .build()?,
+ .build()
+ .map(Arc::new)?,
];
let agg = AggregateExec::try_new(
@@ -429,13 +431,14 @@ fn rountrip_aggregate_with_approx_pencentile_cont() ->
Result<()> {
let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
vec![(col("a", &schema)?, "unused".to_string())];
- let aggregates: Vec<AggregateFunctionExpr> =
vec![AggregateExprBuilder::new(
+ let aggregates = vec![AggregateExprBuilder::new(
approx_percentile_cont_udaf(),
vec![col("b", &schema)?, lit(0.5)],
)
.schema(Arc::clone(&schema))
.alias("APPROX_PERCENTILE_CONT(b, 0.5)")
- .build()?];
+ .build()
+ .map(Arc::new)?];
let agg = AggregateExec::try_new(
AggregateMode::Final,
@@ -464,13 +467,14 @@ fn rountrip_aggregate_with_sort() -> Result<()> {
},
}];
- let aggregates: Vec<AggregateFunctionExpr> =
+ let aggregates =
vec![
AggregateExprBuilder::new(array_agg_udaf(), vec![col("b",
&schema)?])
.schema(Arc::clone(&schema))
.alias("ARRAY_AGG(b)")
.order_by(sort_exprs)
- .build()?,
+ .build()
+ .map(Arc::new)?,
];
let agg = AggregateExec::try_new(
@@ -531,12 +535,13 @@ fn roundtrip_aggregate_udaf() -> Result<()> {
let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
vec![(col("a", &schema)?, "unused".to_string())];
- let aggregates: Vec<AggregateFunctionExpr> =
+ let aggregates =
vec![
AggregateExprBuilder::new(Arc::new(udaf), vec![col("b", &schema)?])
.schema(Arc::clone(&schema))
.alias("example_agg")
- .build()?,
+ .build()
+ .map(Arc::new)?,
];
roundtrip_test_with_context(
@@ -1001,7 +1006,8 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> {
AggregateExprBuilder::new(max_udaf(), vec![udf_expr as Arc<dyn
PhysicalExpr>])
.schema(schema.clone())
.alias("max")
- .build()?;
+ .build()
+ .map(Arc::new)?;
let window = Arc::new(WindowAggExec::try_new(
vec![Arc::new(PlainAggregateWindowExpr::new(
@@ -1052,7 +1058,8 @@ fn roundtrip_aggregate_udf_extension_codec() ->
Result<()> {
let aggr_expr = AggregateExprBuilder::new(Arc::clone(&udaf),
aggr_args.clone())
.schema(Arc::clone(&schema))
.alias("aggregate_udf")
- .build()?;
+ .build()
+ .map(Arc::new)?;
let filter = Arc::new(FilterExec::try_new(
Arc::new(BinaryExpr::new(
@@ -1079,7 +1086,8 @@ fn roundtrip_aggregate_udf_extension_codec() ->
Result<()> {
.alias("aggregate_udf")
.distinct()
.ignore_nulls()
- .build()?;
+ .build()
+ .map(Arc::new)?;
let aggregate = Arc::new(AggregateExec::try_new(
AggregateMode::Final,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]