This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 68f84761d2 Add support for `AggregateExpr`, `WindowExpr` rewrite. 
(#10742)
68f84761d2 is described below

commit 68f84761d2aa8608b34981f5279a7685bf896dba
Author: Mustafa Akur <[email protected]>
AuthorDate: Sat Jun 1 00:32:31 2024 +0300

    Add support for `AggregateExpr`, `WindowExpr` rewrite. (#10742)
    
    * Initial commit
    
    * Minor changes
    
    * Minor changes
    
    * Update comments
---
 .../physical-expr-common/src/aggregate/mod.rs      | 34 +++++++++++++++++++
 datafusion/physical-expr/src/aggregate/count.rs    | 15 +++++++++
 datafusion/physical-expr/src/lib.rs                |  4 ++-
 .../physical-expr/src/window/sliding_aggregate.rs  | 25 ++++++++++++++
 datafusion/physical-expr/src/window/window_expr.rs | 39 ++++++++++++++++++++++
 5 files changed, 116 insertions(+), 1 deletion(-)

diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs 
b/datafusion/physical-expr-common/src/aggregate/mod.rs
index 503e2d8f97..78c7d40b87 100644
--- a/datafusion/physical-expr-common/src/aggregate/mod.rs
+++ b/datafusion/physical-expr-common/src/aggregate/mod.rs
@@ -185,6 +185,40 @@ pub trait AggregateExpr: Send + Sync + Debug + 
PartialEq<dyn Any> {
     fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
         not_impl_err!("Retractable Accumulator hasn't been implemented for 
{self:?} yet")
     }
+
+    /// Returns all expressions used in the [`AggregateExpr`].
+    /// These expressions are  (1)function arguments, (2) order by expressions.
+    fn all_expressions(&self) -> AggregatePhysicalExpressions {
+        let args = self.expressions();
+        let order_bys = self.order_bys().unwrap_or(&[]);
+        let order_by_exprs = order_bys
+            .iter()
+            .map(|sort_expr| sort_expr.expr.clone())
+            .collect::<Vec<_>>();
+        AggregatePhysicalExpressions {
+            args,
+            order_by_exprs,
+        }
+    }
+
+    /// Rewrites [`AggregateExpr`], with new expressions given. The argument 
should be consistent
+    /// with the return value of the [`AggregateExpr::all_expressions`] method.
+    /// Returns `Some(Arc<dyn AggregateExpr>)` if re-write is supported, 
otherwise returns `None`.
+    fn with_new_expressions(
+        &self,
+        _args: Vec<Arc<dyn PhysicalExpr>>,
+        _order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
+    ) -> Option<Arc<dyn AggregateExpr>> {
+        None
+    }
+}
+
+/// Stores the physical expressions used inside the `AggregateExpr`.
+pub struct AggregatePhysicalExpressions {
+    /// Aggregate function arguments
+    pub args: Vec<Arc<dyn PhysicalExpr>>,
+    /// Order by expressions
+    pub order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
 }
 
 /// Physical aggregate expression of a UDAF.
diff --git a/datafusion/physical-expr/src/aggregate/count.rs 
b/datafusion/physical-expr/src/aggregate/count.rs
index e3660221e6..aad18a82ab 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -260,6 +260,21 @@ impl AggregateExpr for Count {
         // instantiate specialized accumulator
         Ok(Box::new(CountGroupsAccumulator::new()))
     }
+
+    fn with_new_expressions(
+        &self,
+        args: Vec<Arc<dyn PhysicalExpr>>,
+        order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
+    ) -> Option<Arc<dyn AggregateExpr>> {
+        debug_assert_eq!(self.exprs.len(), args.len());
+        debug_assert!(order_by_exprs.is_empty());
+        Some(Arc::new(Count {
+            name: self.name.clone(),
+            data_type: self.data_type.clone(),
+            nullable: self.nullable,
+            exprs: args,
+        }))
+    }
 }
 
 impl PartialEq<dyn Any> for Count {
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index 1bdf082b2e..72f5f2d50c 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -41,7 +41,9 @@ pub mod execution_props {
 
 pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState};
 pub use analysis::{analyze, AnalysisContext, ExprBoundaries};
-pub use datafusion_physical_expr_common::aggregate::AggregateExpr;
+pub use datafusion_physical_expr_common::aggregate::{
+    AggregateExpr, AggregatePhysicalExpressions,
+};
 pub use equivalence::EquivalenceProperties;
 pub use partitioning::{Distribution, Partitioning};
 pub use physical_expr::{
diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs 
b/datafusion/physical-expr/src/window/sliding_aggregate.rs
index 1494129cf8..961f0884dd 100644
--- a/datafusion/physical-expr/src/window/sliding_aggregate.rs
+++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs
@@ -141,6 +141,31 @@ impl WindowExpr for SlidingAggregateWindowExpr {
     fn uses_bounded_memory(&self) -> bool {
         !self.window_frame.end_bound.is_unbounded()
     }
+
+    fn with_new_expressions(
+        &self,
+        args: Vec<Arc<dyn PhysicalExpr>>,
+        partition_bys: Vec<Arc<dyn PhysicalExpr>>,
+        order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
+    ) -> Option<Arc<dyn WindowExpr>> {
+        debug_assert_eq!(self.order_by.len(), order_by_exprs.len());
+
+        let new_order_by = self
+            .order_by
+            .iter()
+            .zip(order_by_exprs)
+            .map(|(req, new_expr)| PhysicalSortExpr {
+                expr: new_expr,
+                options: req.options,
+            })
+            .collect::<Vec<_>>();
+        Some(Arc::new(SlidingAggregateWindowExpr {
+            aggregate: self.aggregate.with_new_expressions(args, vec![])?,
+            partition_by: partition_bys,
+            order_by: new_order_by,
+            window_frame: self.window_frame.clone(),
+        }))
+    }
 }
 
 impl AggregateWindowExpr for SlidingAggregateWindowExpr {
diff --git a/datafusion/physical-expr/src/window/window_expr.rs 
b/datafusion/physical-expr/src/window/window_expr.rs
index dd9514c69a..065371d9e4 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -128,6 +128,45 @@ pub trait WindowExpr: Send + Sync + Debug {
 
     /// Get the reverse expression of this [WindowExpr].
     fn get_reverse_expr(&self) -> Option<Arc<dyn WindowExpr>>;
+
+    /// Returns all expressions used in the [`WindowExpr`].
+    /// These expressions are (1) function arguments, (2) partition by 
expressions, (3) order by expressions.
+    fn all_expressions(&self) -> WindowPhysicalExpressions {
+        let args = self.expressions();
+        let partition_by_exprs = self.partition_by().to_vec();
+        let order_by_exprs = self
+            .order_by()
+            .iter()
+            .map(|sort_expr| sort_expr.expr.clone())
+            .collect::<Vec<_>>();
+        WindowPhysicalExpressions {
+            args,
+            partition_by_exprs,
+            order_by_exprs,
+        }
+    }
+
+    /// Rewrites [`WindowExpr`], with new expressions given. The argument 
should be consistent
+    /// with the return value of the [`WindowExpr::all_expressions`] method.
+    /// Returns `Some(Arc<dyn WindowExpr>)` if re-write is supported, 
otherwise returns `None`.
+    fn with_new_expressions(
+        &self,
+        _args: Vec<Arc<dyn PhysicalExpr>>,
+        _partition_bys: Vec<Arc<dyn PhysicalExpr>>,
+        _order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
+    ) -> Option<Arc<dyn WindowExpr>> {
+        None
+    }
+}
+
+/// Stores the physical expressions used inside the `WindowExpr`.
+pub struct WindowPhysicalExpressions {
+    /// Window function arguments
+    pub args: Vec<Arc<dyn PhysicalExpr>>,
+    /// PARTITION BY expressions
+    pub partition_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
+    /// ORDER BY expressions
+    pub order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
 }
 
 /// Extension trait that adds common functionality to [`AggregateWindowExpr`]s


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to