This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a2eca291ad Stop copying LogicalPlan and Exprs in 
`ReplaceDistinctWithAggregate` (#10460)
a2eca291ad is described below

commit a2eca291ad9d586222f042ab4c068feeb055526b
Author: ClSlaid <[email protected]>
AuthorDate: Tue May 14 00:03:25 2024 +0800

    Stop copying LogicalPlan and Exprs in `ReplaceDistinctWithAggregate` 
(#10460)
    
    * patch: implement rewrite for RDWA
    
    Signed-off-by: cailue <[email protected]>
    
    * refactor: rewrite replace_distinct_aggregate
    
    Signed-off-by: 蔡略 <[email protected]>
    
    * patch: recorrect aggr_expr
    
    Signed-off-by: 蔡略 <[email protected]>
    
    * Update datafusion/optimizer/src/replace_distinct_aggregate.rs
    
    ---------
    
    Signed-off-by: cailue <[email protected]>
    Signed-off-by: 蔡略 <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../optimizer/src/replace_distinct_aggregate.rs    | 73 +++++++++++++++-------
 1 file changed, 49 insertions(+), 24 deletions(-)

diff --git a/datafusion/optimizer/src/replace_distinct_aggregate.rs 
b/datafusion/optimizer/src/replace_distinct_aggregate.rs
index 4f68e2623f..404f054cb9 100644
--- a/datafusion/optimizer/src/replace_distinct_aggregate.rs
+++ b/datafusion/optimizer/src/replace_distinct_aggregate.rs
@@ -19,7 +19,9 @@
 use crate::optimizer::{ApplyOrder, ApplyOrder::BottomUp};
 use crate::{OptimizerConfig, OptimizerRule};
 
-use datafusion_common::{Column, Result};
+use datafusion_common::tree_node::Transformed;
+use datafusion_common::{internal_err, Column, Result};
+use datafusion_expr::expr_rewriter::normalize_cols;
 use datafusion_expr::utils::expand_wildcard;
 use datafusion_expr::{
     aggregate_function::AggregateFunction as AggregateFunctionFunc, col,
@@ -66,20 +68,24 @@ impl ReplaceDistinctWithAggregate {
 }
 
 impl OptimizerRule for ReplaceDistinctWithAggregate {
-    fn try_optimize(
+    fn supports_rewrite(&self) -> bool {
+        true
+    }
+
+    fn rewrite(
         &self,
-        plan: &LogicalPlan,
+        plan: LogicalPlan,
         _config: &dyn OptimizerConfig,
-    ) -> Result<Option<LogicalPlan>> {
+    ) -> Result<Transformed<LogicalPlan>> {
         match plan {
             LogicalPlan::Distinct(Distinct::All(input)) => {
-                let group_expr = expand_wildcard(input.schema(), input, None)?;
-                let aggregate = LogicalPlan::Aggregate(Aggregate::try_new(
-                    input.clone(),
+                let group_expr = expand_wildcard(input.schema(), &input, 
None)?;
+                let aggr_plan = LogicalPlan::Aggregate(Aggregate::try_new(
+                    input,
                     group_expr,
                     vec![],
                 )?);
-                Ok(Some(aggregate))
+                Ok(Transformed::yes(aggr_plan))
             }
             LogicalPlan::Distinct(Distinct::On(DistinctOn {
                 select_expr,
@@ -88,13 +94,15 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
                 input,
                 schema,
             })) => {
+                let expr_cnt = on_expr.len();
+
                 // Construct the aggregation expression to be used to fetch 
the selected expressions.
                 let aggr_expr = select_expr
-                    .iter()
+                    .into_iter()
                     .map(|e| {
                         Expr::AggregateFunction(AggregateFunction::new(
                             AggregateFunctionFunc::FirstValue,
-                            vec![e.clone()],
+                            vec![e],
                             false,
                             None,
                             sort_expr.clone(),
@@ -103,45 +111,62 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
                     })
                     .collect::<Vec<Expr>>();
 
+                let aggr_expr = normalize_cols(aggr_expr, input.as_ref())?;
+                let group_expr = normalize_cols(on_expr, input.as_ref())?;
+
                 // Build the aggregation plan
-                let plan = LogicalPlanBuilder::from(input.as_ref().clone())
-                    .aggregate(on_expr.clone(), aggr_expr.to_vec())?
-                    .build()?;
+                let plan = LogicalPlan::Aggregate(Aggregate::try_new(
+                    input, group_expr, aggr_expr,
+                )?);
+                // TODO use LogicalPlanBuilder directly rather than recreating 
the Aggregate
+                // when https://github.com/apache/datafusion/issues/10485 is 
available
+                let lpb = LogicalPlanBuilder::from(plan);
 
-                let plan = if let Some(sort_expr) = sort_expr {
+                let plan = if let Some(mut sort_expr) = sort_expr {
                     // While sort expressions were used in the `FIRST_VALUE` 
aggregation itself above,
                     // this on it's own isn't enough to guarantee the proper 
output order of the grouping
                     // (`ON`) expression, so we need to sort those as well.
-                    LogicalPlanBuilder::from(plan)
-                        .sort(sort_expr[..on_expr.len()].to_vec())?
-                        .build()?
+
+                    // truncate the sort_expr to the length of on_expr
+                    sort_expr.truncate(expr_cnt);
+
+                    lpb.sort(sort_expr)?.build()?
                 } else {
-                    plan
+                    lpb.build()?
                 };
 
                 // Whereas the aggregation plan by default outputs both the 
grouping and the aggregation
                 // expressions, for `DISTINCT ON` we only need to emit the 
original selection expressions.
+
                 let project_exprs = plan
                     .schema()
                     .iter()
-                    .skip(on_expr.len())
+                    .skip(expr_cnt)
                     .zip(schema.iter())
                     .map(|((new_qualifier, new_field), (old_qualifier, 
old_field))| {
-                        Ok(col(Column::from((new_qualifier, new_field)))
-                            .alias_qualified(old_qualifier.cloned(), 
old_field.name()))
+                        col(Column::from((new_qualifier, new_field)))
+                            .alias_qualified(old_qualifier.cloned(), 
old_field.name())
                     })
-                    .collect::<Result<Vec<Expr>>>()?;
+                    .collect::<Vec<Expr>>();
 
                 let plan = LogicalPlanBuilder::from(plan)
                     .project(project_exprs)?
                     .build()?;
 
-                Ok(Some(plan))
+                Ok(Transformed::yes(plan))
             }
-            _ => Ok(None),
+            _ => Ok(Transformed::no(plan)),
         }
     }
 
+    fn try_optimize(
+        &self,
+        _plan: &LogicalPlan,
+        _config: &dyn OptimizerConfig,
+    ) -> Result<Option<LogicalPlan>> {
+        internal_err!("Should have called 
ReplaceDistinctWithAggregate::rewrite")
+    }
+
     fn name(&self) -> &str {
         "replace_distinct_aggregate"
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to