This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a2eca291ad Stop copying LogicalPlan and Exprs in
`ReplaceDistinctWithAggregate` (#10460)
a2eca291ad is described below
commit a2eca291ad9d586222f042ab4c068feeb055526b
Author: ClSlaid <[email protected]>
AuthorDate: Tue May 14 00:03:25 2024 +0800
Stop copying LogicalPlan and Exprs in `ReplaceDistinctWithAggregate`
(#10460)
* patch: implement rewrite for RDWA
Signed-off-by: cailue <[email protected]>
* refactor: rewrite replace_distinct_aggregate
Signed-off-by: 蔡略 <[email protected]>
* patch: recorrect aggr_expr
Signed-off-by: 蔡略 <[email protected]>
* Update datafusion/optimizer/src/replace_distinct_aggregate.rs
---------
Signed-off-by: cailue <[email protected]>
Signed-off-by: 蔡略 <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
.../optimizer/src/replace_distinct_aggregate.rs | 73 +++++++++++++++-------
1 file changed, 49 insertions(+), 24 deletions(-)
diff --git a/datafusion/optimizer/src/replace_distinct_aggregate.rs
b/datafusion/optimizer/src/replace_distinct_aggregate.rs
index 4f68e2623f..404f054cb9 100644
--- a/datafusion/optimizer/src/replace_distinct_aggregate.rs
+++ b/datafusion/optimizer/src/replace_distinct_aggregate.rs
@@ -19,7 +19,9 @@
use crate::optimizer::{ApplyOrder, ApplyOrder::BottomUp};
use crate::{OptimizerConfig, OptimizerRule};
-use datafusion_common::{Column, Result};
+use datafusion_common::tree_node::Transformed;
+use datafusion_common::{internal_err, Column, Result};
+use datafusion_expr::expr_rewriter::normalize_cols;
use datafusion_expr::utils::expand_wildcard;
use datafusion_expr::{
aggregate_function::AggregateFunction as AggregateFunctionFunc, col,
@@ -66,20 +68,24 @@ impl ReplaceDistinctWithAggregate {
}
impl OptimizerRule for ReplaceDistinctWithAggregate {
- fn try_optimize(
+ fn supports_rewrite(&self) -> bool {
+ true
+ }
+
+ fn rewrite(
&self,
- plan: &LogicalPlan,
+ plan: LogicalPlan,
_config: &dyn OptimizerConfig,
- ) -> Result<Option<LogicalPlan>> {
+ ) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Distinct(Distinct::All(input)) => {
- let group_expr = expand_wildcard(input.schema(), input, None)?;
- let aggregate = LogicalPlan::Aggregate(Aggregate::try_new(
- input.clone(),
+ let group_expr = expand_wildcard(input.schema(), &input,
None)?;
+ let aggr_plan = LogicalPlan::Aggregate(Aggregate::try_new(
+ input,
group_expr,
vec![],
)?);
- Ok(Some(aggregate))
+ Ok(Transformed::yes(aggr_plan))
}
LogicalPlan::Distinct(Distinct::On(DistinctOn {
select_expr,
@@ -88,13 +94,15 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
input,
schema,
})) => {
+ let expr_cnt = on_expr.len();
+
// Construct the aggregation expression to be used to fetch
the selected expressions.
let aggr_expr = select_expr
- .iter()
+ .into_iter()
.map(|e| {
Expr::AggregateFunction(AggregateFunction::new(
AggregateFunctionFunc::FirstValue,
- vec![e.clone()],
+ vec![e],
false,
None,
sort_expr.clone(),
@@ -103,45 +111,62 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
})
.collect::<Vec<Expr>>();
+ let aggr_expr = normalize_cols(aggr_expr, input.as_ref())?;
+ let group_expr = normalize_cols(on_expr, input.as_ref())?;
+
// Build the aggregation plan
- let plan = LogicalPlanBuilder::from(input.as_ref().clone())
- .aggregate(on_expr.clone(), aggr_expr.to_vec())?
- .build()?;
+ let plan = LogicalPlan::Aggregate(Aggregate::try_new(
+ input, group_expr, aggr_expr,
+ )?);
+ // TODO use LogicalPlanBuilder directly rather than recreating
the Aggregate
+ // when https://github.com/apache/datafusion/issues/10485 is
available
+ let lpb = LogicalPlanBuilder::from(plan);
- let plan = if let Some(sort_expr) = sort_expr {
+ let plan = if let Some(mut sort_expr) = sort_expr {
// While sort expressions were used in the `FIRST_VALUE`
aggregation itself above,
// this on it's own isn't enough to guarantee the proper
output order of the grouping
// (`ON`) expression, so we need to sort those as well.
- LogicalPlanBuilder::from(plan)
- .sort(sort_expr[..on_expr.len()].to_vec())?
- .build()?
+
+ // truncate the sort_expr to the length of on_expr
+ sort_expr.truncate(expr_cnt);
+
+ lpb.sort(sort_expr)?.build()?
} else {
- plan
+ lpb.build()?
};
// Whereas the aggregation plan by default outputs both the
grouping and the aggregation
// expressions, for `DISTINCT ON` we only need to emit the
original selection expressions.
+
let project_exprs = plan
.schema()
.iter()
- .skip(on_expr.len())
+ .skip(expr_cnt)
.zip(schema.iter())
.map(|((new_qualifier, new_field), (old_qualifier,
old_field))| {
- Ok(col(Column::from((new_qualifier, new_field)))
- .alias_qualified(old_qualifier.cloned(),
old_field.name()))
+ col(Column::from((new_qualifier, new_field)))
+ .alias_qualified(old_qualifier.cloned(),
old_field.name())
})
- .collect::<Result<Vec<Expr>>>()?;
+ .collect::<Vec<Expr>>();
let plan = LogicalPlanBuilder::from(plan)
.project(project_exprs)?
.build()?;
- Ok(Some(plan))
+ Ok(Transformed::yes(plan))
}
- _ => Ok(None),
+ _ => Ok(Transformed::no(plan)),
}
}
+ fn try_optimize(
+ &self,
+ _plan: &LogicalPlan,
+ _config: &dyn OptimizerConfig,
+ ) -> Result<Option<LogicalPlan>> {
+ internal_err!("Should have called
ReplaceDistinctWithAggregate::rewrite")
+ }
+
fn name(&self) -> &str {
"replace_distinct_aggregate"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]