This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9deec2ad72 minor: implement with_new_expressions for 
AggregateFunctionExpr (#16897)
9deec2ad72 is described below

commit 9deec2ad725813e266dc8c51ec75302fed7412f8
Author: Berkay Şahin <124376117+berkaysynn...@users.noreply.github.com>
AuthorDate: Sat Jul 26 20:15:48 2025 +0300

    minor: implement with_new_expressions for AggregateFunctionExpr (#16897)
    
    * minor
    
    * Update aggregate.rs
---
 datafusion/expr/src/window_state.rs                | 10 +++---
 datafusion/physical-expr/src/aggregate.rs          | 38 ++++++++++++++++++++--
 .../physical-expr/src/expressions/literal.rs       |  2 +-
 datafusion/physical-expr/src/window/aggregate.rs   |  6 +++-
 .../physical-expr/src/window/sliding_aggregate.rs  |  6 +++-
 datafusion/physical-expr/src/window/standard.rs    |  4 +++
 datafusion/physical-expr/src/window/window_expr.rs |  6 ++++
 .../group_values/single_group_by/bytes.rs          | 13 ++++----
 datafusion/physical-plan/src/test.rs               |  2 +-
 9 files changed, 68 insertions(+), 19 deletions(-)

diff --git a/datafusion/expr/src/window_state.rs 
b/datafusion/expr/src/window_state.rs
index a101b8fe4d..014bed5aea 100644
--- a/datafusion/expr/src/window_state.rs
+++ b/datafusion/expr/src/window_state.rs
@@ -34,7 +34,7 @@ use datafusion_common::{
 };
 
 /// Holds the state of evaluating a window function
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct WindowAggState {
     /// The range that we calculate the window function
     pub window_frame_range: Range<usize>,
@@ -112,7 +112,7 @@ impl WindowAggState {
 }
 
 /// This object stores the window frame state for use in incremental 
calculations.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub enum WindowFrameContext {
     /// ROWS frames are inherently stateless.
     Rows(Arc<WindowFrame>),
@@ -240,7 +240,7 @@ impl WindowFrameContext {
 }
 
 /// State for each unique partition determined according to PARTITION BY 
column(s)
-#[derive(Debug)]
+#[derive(Debug, Clone, PartialEq)]
 pub struct PartitionBatchState {
     /// The record batch belonging to current partition
     pub record_batch: RecordBatch,
@@ -282,7 +282,7 @@ impl PartitionBatchState {
 /// ranges of data while processing RANGE frames.
 /// Attribute `sort_options` stores the column ordering specified by the ORDER
 /// BY clause. This information is used to calculate the range.
-#[derive(Debug, Default)]
+#[derive(Debug, Default, Clone)]
 pub struct WindowFrameStateRange {
     sort_options: Vec<SortOptions>,
 }
@@ -454,7 +454,7 @@ impl WindowFrameStateRange {
 
 /// This structure encapsulates all the state information we require as we
 /// scan groups of data while processing window frames.
-#[derive(Debug, Default)]
+#[derive(Debug, Default, Clone)]
 pub struct WindowFrameStateGroups {
     /// A tuple containing group values and the row index where the group ends.
     /// Example: [[1, 1], [1, 1], [2, 1], [2, 1], ...] would correspond to
diff --git a/datafusion/physical-expr/src/aggregate.rs 
b/datafusion/physical-expr/src/aggregate.rs
index 9175c01274..ed30481182 100644
--- a/datafusion/physical-expr/src/aggregate.rs
+++ b/datafusion/physical-expr/src/aggregate.rs
@@ -616,10 +616,42 @@ impl AggregateFunctionExpr {
     /// Returns `Some(Arc<dyn AggregateExpr>)` if re-write is supported, 
otherwise returns `None`.
     pub fn with_new_expressions(
         &self,
-        _args: Vec<Arc<dyn PhysicalExpr>>,
-        _order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
+        args: Vec<Arc<dyn PhysicalExpr>>,
+        order_by_exprs: Vec<Arc<dyn PhysicalExpr>>,
     ) -> Option<AggregateFunctionExpr> {
-        None
+        if args.len() != self.args.len()
+            || (self.order_sensitivity() != 
AggregateOrderSensitivity::Insensitive
+                && order_by_exprs.len() != self.order_bys.len())
+        {
+            return None;
+        }
+
+        let new_order_bys = self
+            .order_bys
+            .iter()
+            .zip(order_by_exprs)
+            .map(|(req, new_expr)| PhysicalSortExpr {
+                expr: new_expr,
+                options: req.options,
+            })
+            .collect();
+
+        Some(AggregateFunctionExpr {
+            fun: self.fun.clone(),
+            args,
+            return_field: Arc::clone(&self.return_field),
+            name: self.name.clone(),
+            // TODO: Human name should be updated after re-write to not mislead
+            human_display: self.human_display.clone(),
+            schema: self.schema.clone(),
+            order_bys: new_order_bys,
+            ignore_nulls: self.ignore_nulls,
+            ordering_fields: self.ordering_fields.clone(),
+            is_distinct: self.is_distinct,
+            is_reversed: false,
+            input_fields: self.input_fields.clone(),
+            is_nullable: self.is_nullable,
+        })
     }
 
     /// If this function is max, return (output_field, true)
diff --git a/datafusion/physical-expr/src/expressions/literal.rs 
b/datafusion/physical-expr/src/expressions/literal.rs
index 1a2ebf000f..6e425ee439 100644
--- a/datafusion/physical-expr/src/expressions/literal.rs
+++ b/datafusion/physical-expr/src/expressions/literal.rs
@@ -36,7 +36,7 @@ use datafusion_expr_common::interval_arithmetic::Interval;
 use datafusion_expr_common::sort_properties::{ExprProperties, SortProperties};
 
 /// Represents a literal value
-#[derive(Debug, PartialEq, Eq)]
+#[derive(Debug, PartialEq, Eq, Clone)]
 pub struct Literal {
     value: ScalarValue,
     field: FieldRef,
diff --git a/datafusion/physical-expr/src/window/aggregate.rs 
b/datafusion/physical-expr/src/window/aggregate.rs
index 6f0e7c963d..d7287c27de 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
 
 use crate::aggregate::AggregateFunctionExpr;
 use crate::window::standard::add_new_ordering_expr_with_partition_by;
-use crate::window::window_expr::AggregateWindowExpr;
+use crate::window::window_expr::{AggregateWindowExpr, WindowFn};
 use crate::window::{
     PartitionBatches, PartitionWindowAggStates, SlidingAggregateWindowExpr, 
WindowExpr,
 };
@@ -211,6 +211,10 @@ impl WindowExpr for PlainAggregateWindowExpr {
     fn uses_bounded_memory(&self) -> bool {
         !self.window_frame.end_bound.is_unbounded()
     }
+
+    fn create_window_fn(&self) -> Result<WindowFn> {
+        Ok(WindowFn::Aggregate(self.get_accumulator()?))
+    }
 }
 
 impl AggregateWindowExpr for PlainAggregateWindowExpr {
diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs 
b/datafusion/physical-expr/src/window/sliding_aggregate.rs
index 33921a57a6..cb105e773d 100644
--- a/datafusion/physical-expr/src/window/sliding_aggregate.rs
+++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs
@@ -22,7 +22,7 @@ use std::ops::Range;
 use std::sync::Arc;
 
 use crate::aggregate::AggregateFunctionExpr;
-use crate::window::window_expr::AggregateWindowExpr;
+use crate::window::window_expr::{AggregateWindowExpr, WindowFn};
 use crate::window::{
     PartitionBatches, PartitionWindowAggStates, PlainAggregateWindowExpr, 
WindowExpr,
 };
@@ -175,6 +175,10 @@ impl WindowExpr for SlidingAggregateWindowExpr {
             window_frame: Arc::clone(&self.window_frame),
         }))
     }
+
+    fn create_window_fn(&self) -> Result<WindowFn> {
+        Ok(WindowFn::Aggregate(self.get_accumulator()?))
+    }
 }
 
 impl AggregateWindowExpr for SlidingAggregateWindowExpr {
diff --git a/datafusion/physical-expr/src/window/standard.rs 
b/datafusion/physical-expr/src/window/standard.rs
index c3761aa78f..7b208ea41f 100644
--- a/datafusion/physical-expr/src/window/standard.rs
+++ b/datafusion/physical-expr/src/window/standard.rs
@@ -275,6 +275,10 @@ impl WindowExpr for StandardWindowExpr {
             false
         }
     }
+
+    fn create_window_fn(&self) -> Result<WindowFn> {
+        Ok(WindowFn::Builtin(self.expr.create_evaluator()?))
+    }
 }
 
 /// Adds a new ordering expression into existing ordering equivalence 
class(es) based on
diff --git a/datafusion/physical-expr/src/window/window_expr.rs 
b/datafusion/physical-expr/src/window/window_expr.rs
index dd671e0685..ee39b5b245 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -130,6 +130,12 @@ pub trait WindowExpr: Send + Sync + Debug {
     /// Get the reverse expression of this [WindowExpr].
     fn get_reverse_expr(&self) -> Option<Arc<dyn WindowExpr>>;
 
+    /// Creates a new instance of the window function evaluator.
+    ///
+    /// Returns `WindowFn::Builtin` for built-in window functions (e.g., 
ROW_NUMBER, RANK)
+    /// or `WindowFn::Aggregate` for aggregate window functions (e.g., SUM, 
AVG).
+    fn create_window_fn(&self) -> Result<WindowFn>;
+
     /// Returns all expressions used in the [`WindowExpr`].
     /// These expressions are (1) function arguments, (2) partition by 
expressions, (3) order by expressions.
     fn all_expressions(&self) -> WindowPhysicalExpressions {
diff --git 
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs 
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs
index 9686b8c352..21078ceb8a 100644
--- 
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs
+++ 
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs
@@ -15,11 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::mem::size_of;
+
 use crate::aggregates::group_values::GroupValues;
+
 use arrow::array::{Array, ArrayRef, OffsetSizeTrait, RecordBatch};
+use datafusion_common::Result;
 use datafusion_expr::EmitTo;
 use datafusion_physical_expr_common::binary_map::{ArrowBytesMap, OutputType};
-use std::mem::size_of;
 
 /// A [`GroupValues`] storing single column of 
Utf8/LargeUtf8/Binary/LargeBinary values
 ///
@@ -42,11 +45,7 @@ impl<O: OffsetSizeTrait> GroupValuesByes<O> {
 }
 
 impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
-    fn intern(
-        &mut self,
-        cols: &[ArrayRef],
-        groups: &mut Vec<usize>,
-    ) -> datafusion_common::Result<()> {
+    fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> 
Result<()> {
         assert_eq!(cols.len(), 1);
 
         // look up / add entries in the table
@@ -85,7 +84,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
         self.num_groups
     }
 
-    fn emit(&mut self, emit_to: EmitTo) -> 
datafusion_common::Result<Vec<ArrayRef>> {
+    fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
         // Reset the map to default, and convert it into a single array
         let map_contents = self.map.take().into_state();
 
diff --git a/datafusion/physical-plan/src/test.rs 
b/datafusion/physical-plan/src/test.rs
index be921e0581..349f9955b6 100644
--- a/datafusion/physical-plan/src/test.rs
+++ b/datafusion/physical-plan/src/test.rs
@@ -131,7 +131,7 @@ impl ExecutionPlan for TestMemoryExec {
     }
 
     fn as_any(&self) -> &dyn Any {
-        unimplemented!()
+        self
     }
 
     fn properties(&self) -> &PlanProperties {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to