This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8cffb68d0c Stop copying LogicalPlan and Exprs in
`SingleDistinctToGroupBy` (#10527)
8cffb68d0c is described below
commit 8cffb68d0c4e7d3acd304096f326a157be20b81a
Author: Chunchun Ye <[email protected]>
AuthorDate: Wed May 22 04:51:49 2024 -0500
Stop copying LogicalPlan and Exprs in `SingleDistinctToGroupBy` (#10527)
* chore: merge main and resolve conflict
* chore: use less copy
* chore: remove clone
* remove more clones (#8)
* refactor: use HashSet<&Expr> instead of HashSet<String>
* refactor: remove more cloning
* chore: reduce string allocation
Co-authored-by: Adam Curtis <[email protected]>
* chore: return internal error instead of panacing
* chore: use arg display_name as hash key instead of a hashed value
---------
Co-authored-by: Adam Curtis <[email protected]>
---
.../optimizer/src/single_distinct_to_groupby.rs | 404 +++++++++++----------
1 file changed, 204 insertions(+), 200 deletions(-)
diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs
b/datafusion/optimizer/src/single_distinct_to_groupby.rs
index 4f9c1ad645..4b1f9a0d14 100644
--- a/datafusion/optimizer/src/single_distinct_to_groupby.rs
+++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs
@@ -22,7 +22,9 @@ use std::sync::Arc;
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
-use datafusion_common::{qualified_name, Result};
+use datafusion_common::{
+ internal_err, qualified_name, tree_node::Transformed, DataFusionError,
Result,
+};
use datafusion_expr::builder::project;
use datafusion_expr::expr::AggregateFunctionDefinition;
use datafusion_expr::{
@@ -64,63 +66,55 @@ impl SingleDistinctToGroupBy {
}
/// Check whether all aggregate exprs are distinct on a single field.
-fn is_single_distinct_agg(plan: &LogicalPlan) -> Result<bool> {
- match plan {
- LogicalPlan::Aggregate(Aggregate { aggr_expr, .. }) => {
- let mut fields_set = HashSet::new();
- let mut aggregate_count = 0;
- for expr in aggr_expr {
- if let Expr::AggregateFunction(AggregateFunction {
- func_def: AggregateFunctionDefinition::BuiltIn(fun),
- distinct,
- args,
- filter,
- order_by,
- null_treatment: _,
- }) = expr
- {
- if filter.is_some() || order_by.is_some() {
- return Ok(false);
- }
- aggregate_count += 1;
- if *distinct {
- for e in args {
- fields_set.insert(e.canonical_name());
- }
- } else if !matches!(fun, Sum | Min | Max) {
- return Ok(false);
- }
- } else if let Expr::AggregateFunction(AggregateFunction {
- func_def: AggregateFunctionDefinition::UDF(fun),
- distinct,
- args,
- filter,
- order_by,
- null_treatment: _,
- }) = expr
- {
- if filter.is_some() || order_by.is_some() {
- return Ok(false);
- }
- aggregate_count += 1;
- if *distinct {
- for e in args {
- fields_set.insert(e.canonical_name());
- }
- } else if fun.name() != "SUM"
- && fun.name() != "MIN"
- && fun.name() != "MAX"
- {
- return Ok(false);
- }
- } else {
- return Ok(false);
+fn is_single_distinct_agg(aggr_expr: &[Expr]) -> Result<bool> {
+ let mut fields_set = HashSet::new();
+ let mut aggregate_count = 0;
+ for expr in aggr_expr {
+ if let Expr::AggregateFunction(AggregateFunction {
+ func_def: AggregateFunctionDefinition::BuiltIn(fun),
+ distinct,
+ args,
+ filter,
+ order_by,
+ null_treatment: _,
+ }) = expr
+ {
+ if filter.is_some() || order_by.is_some() {
+ return Ok(false);
+ }
+ aggregate_count += 1;
+ if *distinct {
+ for e in args {
+ fields_set.insert(e);
}
+ } else if !matches!(fun, Sum | Min | Max) {
+ return Ok(false);
+ }
+ } else if let Expr::AggregateFunction(AggregateFunction {
+ func_def: AggregateFunctionDefinition::UDF(fun),
+ distinct,
+ args,
+ filter,
+ order_by,
+ null_treatment: _,
+ }) = expr
+ {
+ if filter.is_some() || order_by.is_some() {
+ return Ok(false);
}
- Ok(aggregate_count == aggr_expr.len() && fields_set.len() == 1)
+ aggregate_count += 1;
+ if *distinct {
+ for e in args {
+ fields_set.insert(e);
+ }
+ } else if fun.name() != "SUM" && fun.name() != "MIN" && fun.name()
!= "MAX" {
+ return Ok(false);
+ }
+ } else {
+ return Ok(false);
}
- _ => Ok(false),
}
+ Ok(aggregate_count == aggr_expr.len() && fields_set.len() == 1)
}
/// Check if the first expr is [Expr::GroupingSet].
@@ -131,9 +125,29 @@ fn contains_grouping_set(expr: &[Expr]) -> bool {
impl OptimizerRule for SingleDistinctToGroupBy {
fn try_optimize(
&self,
- plan: &LogicalPlan,
+ _plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
+ internal_err!("Should have called SingleDistinctToGroupBy::rewrite")
+ }
+
+ fn name(&self) -> &str {
+ "single_distinct_aggregation_to_group_by"
+ }
+
+ fn apply_order(&self) -> Option<ApplyOrder> {
+ Some(ApplyOrder::TopDown)
+ }
+
+ fn supports_rewrite(&self) -> bool {
+ true
+ }
+
+ fn rewrite(
+ &self,
+ plan: LogicalPlan,
+ _config: &dyn OptimizerConfig,
+ ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
match plan {
LogicalPlan::Aggregate(Aggregate {
input,
@@ -141,167 +155,157 @@ impl OptimizerRule for SingleDistinctToGroupBy {
schema,
group_expr,
..
- }) => {
- if is_single_distinct_agg(plan)? &&
!contains_grouping_set(group_expr) {
- // alias all original group_by exprs
- let (mut inner_group_exprs, out_group_expr_with_alias): (
- Vec<Expr>,
- Vec<(Expr, Option<String>)>,
- ) = group_expr
- .iter()
- .enumerate()
- .map(|(i, group_expr)| {
- if let Expr::Column(_) = group_expr {
- // For Column expressions we can use existing
expression as is.
- (group_expr.clone(), (group_expr.clone(),
None))
- } else {
- // For complex expression write is as alias,
to be able to refer
- // if from parent operators successfully.
- // Consider plan below.
- //
- // Aggregate: groupBy=[[group_alias_0]],
aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\
- // --Aggregate: groupBy=[[test.a + Int32(1) AS
group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32,
alias1:UInt32]\
- // ----TableScan: test [a:UInt32, b:UInt32,
c:UInt32]
- //
- // First aggregate(from bottom) refers to
`test.a` column.
- // Second aggregate refers to the
`group_alias_0` column, Which is a valid field in the first aggregate.
- // If we were to write plan above as below
without alias
- //
- // Aggregate: groupBy=[[test.a + Int32(1)]],
aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\
- // --Aggregate: groupBy=[[test.a + Int32(1),
test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\
- // ----TableScan: test [a:UInt32, b:UInt32,
c:UInt32]
- //
- // Second aggregate refers to the `test.a +
Int32(1)` expression However, its input do not have `test.a` expression in it.
- let alias_str = format!("group_alias_{i}");
- let alias_expr =
group_expr.clone().alias(&alias_str);
- let (qualifier, field) =
schema.qualified_field(i);
+ }) if is_single_distinct_agg(&aggr_expr)?
+ && !contains_grouping_set(&group_expr) =>
+ {
+ let group_size = group_expr.len();
+ // alias all original group_by exprs
+ let (mut inner_group_exprs, out_group_expr_with_alias): (
+ Vec<Expr>,
+ Vec<(Expr, Option<String>)>,
+ ) = group_expr
+ .into_iter()
+ .enumerate()
+ .map(|(i, group_expr)| {
+ if let Expr::Column(_) = group_expr {
+ // For Column expressions we can use existing
expression as is.
+ (group_expr.clone(), (group_expr, None))
+ } else {
+ // For complex expression write is as alias, to be
able to refer
+ // if from parent operators successfully.
+ // Consider plan below.
+ //
+ // Aggregate: groupBy=[[group_alias_0]],
aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\
+ // --Aggregate: groupBy=[[test.a + Int32(1) AS
group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32,
alias1:UInt32]\
+ // ----TableScan: test [a:UInt32, b:UInt32,
c:UInt32]
+ //
+ // First aggregate(from bottom) refers to `test.a`
column.
+ // Second aggregate refers to the `group_alias_0`
column, Which is a valid field in the first aggregate.
+ // If we were to write plan above as below without
alias
+ //
+ // Aggregate: groupBy=[[test.a + Int32(1)]],
aggr=[[COUNT(alias1)]] [group_alias_0:Int32, COUNT(alias1):Int64;N]\
+ // --Aggregate: groupBy=[[test.a + Int32(1),
test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\
+ // ----TableScan: test [a:UInt32, b:UInt32,
c:UInt32]
+ //
+ // Second aggregate refers to the `test.a +
Int32(1)` expression However, its input do not have `test.a` expression in it.
+ let alias_str = format!("group_alias_{i}");
+ let (qualifier, field) = schema.qualified_field(i);
+ (
+ group_expr.alias(alias_str.clone()),
(
- alias_expr,
- (
- col(alias_str),
- Some(qualified_name(qualifier,
field.name())),
- ),
- )
- }
- })
- .unzip();
-
- // and they can be referenced by the alias in the outer
aggr plan
- let outer_group_exprs = out_group_expr_with_alias
- .iter()
- .map(|(out_group_expr, _)| out_group_expr.clone())
- .collect::<Vec<_>>();
-
- // replace the distinct arg with alias
- let mut index = 1;
- let mut group_fields_set = HashSet::new();
- let mut inner_aggr_exprs = vec![];
- let outer_aggr_exprs = aggr_expr
- .iter()
- .map(|aggr_expr| match aggr_expr {
- Expr::AggregateFunction(AggregateFunction {
- func_def:
AggregateFunctionDefinition::BuiltIn(fun),
- args,
- distinct,
- ..
- }) => {
- // is_single_distinct_agg ensure args.len=1
- if *distinct
- &&
group_fields_set.insert(args[0].display_name()?)
- {
- inner_group_exprs.push(
-
args[0].clone().alias(SINGLE_DISTINCT_ALIAS),
- );
+ col(alias_str),
+ Some(qualified_name(qualifier,
field.name())),
+ ),
+ )
+ }
+ })
+ .unzip();
+
+ // replace the distinct arg with alias
+ let mut index = 1;
+ let mut group_fields_set = HashSet::new();
+ let mut inner_aggr_exprs = vec![];
+ let outer_aggr_exprs = aggr_expr
+ .into_iter()
+ .map(|aggr_expr| match aggr_expr {
+ Expr::AggregateFunction(AggregateFunction {
+ func_def:
AggregateFunctionDefinition::BuiltIn(fun),
+ mut args,
+ distinct,
+ ..
+ }) => {
+ if distinct {
+ if args.len() != 1 {
+ return internal_err!("DISTINCT aggregate
should have exactly one argument");
}
+ let arg = args.swap_remove(0);
+ if
group_fields_set.insert(arg.display_name()?) {
+ inner_group_exprs
+
.push(arg.alias(SINGLE_DISTINCT_ALIAS));
+ }
+
Ok(Expr::AggregateFunction(AggregateFunction::new(
+ fun,
+ vec![col(SINGLE_DISTINCT_ALIAS)],
+ false, // intentional to remove distinct
here
+ None,
+ None,
+ None,
+ )))
// if the aggregate function is not distinct,
we need to rewrite it like two phase aggregation
- if !(*distinct) {
- index += 1;
- let alias_str = format!("alias{}", index);
- inner_aggr_exprs.push(
-
Expr::AggregateFunction(AggregateFunction::new(
- fun.clone(),
- args.clone(),
- false,
- None,
- None,
- None,
- ))
- .alias(&alias_str),
- );
-
Ok(Expr::AggregateFunction(AggregateFunction::new(
+ } else {
+ index += 1;
+ let alias_str = format!("alias{}", index);
+ inner_aggr_exprs.push(
+
Expr::AggregateFunction(AggregateFunction::new(
fun.clone(),
- vec![col(&alias_str)],
+ args,
false,
None,
None,
None,
- )))
- } else {
-
Ok(Expr::AggregateFunction(AggregateFunction::new(
- fun.clone(),
- vec![col(SINGLE_DISTINCT_ALIAS)],
- false, // intentional to remove
distinct here
- None,
- None,
- None,
- )))
- }
- }
- _ => Ok(aggr_expr.clone()),
- })
- .collect::<Result<Vec<_>>>()?;
-
- // construct the inner AggrPlan
- let inner_agg = LogicalPlan::Aggregate(Aggregate::try_new(
- input.clone(),
- inner_group_exprs,
- inner_aggr_exprs,
- )?);
-
- // so the aggregates are displayed in the same way even
after the rewrite
- // this optimizer has two kinds of alias:
- // - group_by aggr
- // - aggr expr
- let group_size = group_expr.len();
- let alias_expr: Vec<_> = out_group_expr_with_alias
- .into_iter()
- .map(|(group_expr, original_field)| {
- if let Some(name) = original_field {
- group_expr.alias(name)
- } else {
- group_expr
+ ))
+ .alias(&alias_str),
+ );
+
Ok(Expr::AggregateFunction(AggregateFunction::new(
+ fun,
+ vec![col(&alias_str)],
+ false,
+ None,
+ None,
+ None,
+ )))
}
- })
- .chain(outer_aggr_exprs.iter().enumerate().map(|(idx,
expr)| {
+ }
+ _ => Ok(aggr_expr),
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // construct the inner AggrPlan
+ let inner_agg = LogicalPlan::Aggregate(Aggregate::try_new(
+ input,
+ inner_group_exprs,
+ inner_aggr_exprs,
+ )?);
+
+ let outer_group_exprs = out_group_expr_with_alias
+ .iter()
+ .map(|(expr, _)| expr.clone())
+ .collect();
+
+ // so the aggregates are displayed in the same way even after
the rewrite
+ // this optimizer has two kinds of alias:
+ // - group_by aggr
+ // - aggr expr
+ let alias_expr: Vec<_> = out_group_expr_with_alias
+ .into_iter()
+ .map(|(group_expr, original_field)| {
+ if let Some(name) = original_field {
+ group_expr.alias(name)
+ } else {
+ group_expr
+ }
+ })
+ .chain(outer_aggr_exprs.iter().cloned().enumerate().map(
+ |(idx, expr)| {
let idx = idx + group_size;
let (qualifier, field) =
schema.qualified_field(idx);
let name = qualified_name(qualifier, field.name());
- expr.clone().alias(name)
- }))
- .collect();
-
- let outer_aggr = LogicalPlan::Aggregate(Aggregate::try_new(
- Arc::new(inner_agg),
- outer_group_exprs,
- outer_aggr_exprs,
- )?);
- Ok(Some(project(outer_aggr, alias_expr)?))
- } else {
- Ok(None)
- }
+ expr.alias(name)
+ },
+ ))
+ .collect();
+
+ let outer_aggr = LogicalPlan::Aggregate(Aggregate::try_new(
+ Arc::new(inner_agg),
+ outer_group_exprs,
+ outer_aggr_exprs,
+ )?);
+ Ok(Transformed::yes(project(outer_aggr, alias_expr)?))
}
- _ => Ok(None),
+ _ => Ok(Transformed::no(plan)),
}
}
-
- fn name(&self) -> &str {
- "single_distinct_aggregation_to_group_by"
- }
-
- fn apply_order(&self) -> Option<ApplyOrder> {
- Some(ApplyOrder::TopDown)
- }
}
#[cfg(test)]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]