This is an automated email from the ASF dual-hosted git repository. comphead pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 9deec2ad72 minor: implement with_new_expressions for AggregateFunctionExpr (#16897) 9deec2ad72 is described below commit 9deec2ad725813e266dc8c51ec75302fed7412f8 Author: Berkay Şahin <124376117+berkaysynn...@users.noreply.github.com> AuthorDate: Sat Jul 26 20:15:48 2025 +0300 minor: implement with_new_expressions for AggregateFunctionExpr (#16897) * minor * Update aggregate.rs --- datafusion/expr/src/window_state.rs | 10 +++--- datafusion/physical-expr/src/aggregate.rs | 38 ++++++++++++++++++++-- .../physical-expr/src/expressions/literal.rs | 2 +- datafusion/physical-expr/src/window/aggregate.rs | 6 +++- .../physical-expr/src/window/sliding_aggregate.rs | 6 +++- datafusion/physical-expr/src/window/standard.rs | 4 +++ datafusion/physical-expr/src/window/window_expr.rs | 6 ++++ .../group_values/single_group_by/bytes.rs | 13 ++++---- datafusion/physical-plan/src/test.rs | 2 +- 9 files changed, 68 insertions(+), 19 deletions(-) diff --git a/datafusion/expr/src/window_state.rs b/datafusion/expr/src/window_state.rs index a101b8fe4d..014bed5aea 100644 --- a/datafusion/expr/src/window_state.rs +++ b/datafusion/expr/src/window_state.rs @@ -34,7 +34,7 @@ use datafusion_common::{ }; /// Holds the state of evaluating a window function -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct WindowAggState { /// The range that we calculate the window function pub window_frame_range: Range<usize>, @@ -112,7 +112,7 @@ impl WindowAggState { } /// This object stores the window frame state for use in incremental calculations. -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum WindowFrameContext { /// ROWS frames are inherently stateless. Rows(Arc<WindowFrame>), @@ -240,7 +240,7 @@ impl WindowFrameContext { } /// State for each unique partition determined according to PARTITION BY column(s) -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq)] pub struct PartitionBatchState { /// The record batch belonging to current partition pub record_batch: RecordBatch, @@ -282,7 +282,7 @@ impl PartitionBatchState { /// ranges of data while processing RANGE frames. /// Attribute `sort_options` stores the column ordering specified by the ORDER /// BY clause. This information is used to calculate the range. -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct WindowFrameStateRange { sort_options: Vec<SortOptions>, } @@ -454,7 +454,7 @@ impl WindowFrameStateRange { /// This structure encapsulates all the state information we require as we /// scan groups of data while processing window frames. -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct WindowFrameStateGroups { /// A tuple containing group values and the row index where the group ends. /// Example: [[1, 1], [1, 1], [2, 1], [2, 1], ...] would correspond to diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index 9175c01274..ed30481182 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -616,10 +616,42 @@ impl AggregateFunctionExpr { /// Returns `Some(Arc<dyn AggregateExpr>)` if re-write is supported, otherwise returns `None`. pub fn with_new_expressions( &self, - _args: Vec<Arc<dyn PhysicalExpr>>, - _order_by_exprs: Vec<Arc<dyn PhysicalExpr>>, + args: Vec<Arc<dyn PhysicalExpr>>, + order_by_exprs: Vec<Arc<dyn PhysicalExpr>>, ) -> Option<AggregateFunctionExpr> { - None + if args.len() != self.args.len() + || (self.order_sensitivity() != AggregateOrderSensitivity::Insensitive + && order_by_exprs.len() != self.order_bys.len()) + { + return None; + } + + let new_order_bys = self + .order_bys + .iter() + .zip(order_by_exprs) + .map(|(req, new_expr)| PhysicalSortExpr { + expr: new_expr, + options: req.options, + }) + .collect(); + + Some(AggregateFunctionExpr { + fun: self.fun.clone(), + args, + return_field: Arc::clone(&self.return_field), + name: self.name.clone(), + // TODO: Human name should be updated after re-write to not mislead + human_display: self.human_display.clone(), + schema: self.schema.clone(), + order_bys: new_order_bys, + ignore_nulls: self.ignore_nulls, + ordering_fields: self.ordering_fields.clone(), + is_distinct: self.is_distinct, + is_reversed: false, + input_fields: self.input_fields.clone(), + is_nullable: self.is_nullable, + }) } /// If this function is max, return (output_field, true) diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 1a2ebf000f..6e425ee439 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -36,7 +36,7 @@ use datafusion_expr_common::interval_arithmetic::Interval; use datafusion_expr_common::sort_properties::{ExprProperties, SortProperties}; /// Represents a literal value -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub struct Literal { value: ScalarValue, field: FieldRef, diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index 6f0e7c963d..d7287c27de 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use crate::aggregate::AggregateFunctionExpr; use crate::window::standard::add_new_ordering_expr_with_partition_by; -use crate::window::window_expr::AggregateWindowExpr; +use crate::window::window_expr::{AggregateWindowExpr, WindowFn}; use crate::window::{ PartitionBatches, PartitionWindowAggStates, SlidingAggregateWindowExpr, WindowExpr, }; @@ -211,6 +211,10 @@ impl WindowExpr for PlainAggregateWindowExpr { fn uses_bounded_memory(&self) -> bool { !self.window_frame.end_bound.is_unbounded() } + + fn create_window_fn(&self) -> Result<WindowFn> { + Ok(WindowFn::Aggregate(self.get_accumulator()?)) + } } impl AggregateWindowExpr for PlainAggregateWindowExpr { diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index 33921a57a6..cb105e773d 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -22,7 +22,7 @@ use std::ops::Range; use std::sync::Arc; use crate::aggregate::AggregateFunctionExpr; -use crate::window::window_expr::AggregateWindowExpr; +use crate::window::window_expr::{AggregateWindowExpr, WindowFn}; use crate::window::{ PartitionBatches, PartitionWindowAggStates, PlainAggregateWindowExpr, WindowExpr, }; @@ -175,6 +175,10 @@ impl WindowExpr for SlidingAggregateWindowExpr { window_frame: Arc::clone(&self.window_frame), })) } + + fn create_window_fn(&self) -> Result<WindowFn> { + Ok(WindowFn::Aggregate(self.get_accumulator()?)) + } } impl AggregateWindowExpr for SlidingAggregateWindowExpr { diff --git a/datafusion/physical-expr/src/window/standard.rs b/datafusion/physical-expr/src/window/standard.rs index c3761aa78f..7b208ea41f 100644 --- a/datafusion/physical-expr/src/window/standard.rs +++ b/datafusion/physical-expr/src/window/standard.rs @@ -275,6 +275,10 @@ impl WindowExpr for StandardWindowExpr { false } } + + fn create_window_fn(&self) -> Result<WindowFn> { + Ok(WindowFn::Builtin(self.expr.create_evaluator()?)) + } } /// Adds a new ordering expression into existing ordering equivalence class(es) based on diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index dd671e0685..ee39b5b245 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -130,6 +130,12 @@ pub trait WindowExpr: Send + Sync + Debug { /// Get the reverse expression of this [WindowExpr]. fn get_reverse_expr(&self) -> Option<Arc<dyn WindowExpr>>; + /// Creates a new instance of the window function evaluator. + /// + /// Returns `WindowFn::Builtin` for built-in window functions (e.g., ROW_NUMBER, RANK) + /// or `WindowFn::Aggregate` for aggregate window functions (e.g., SUM, AVG). + fn create_window_fn(&self) -> Result<WindowFn>; + /// Returns all expressions used in the [`WindowExpr`]. /// These expressions are (1) function arguments, (2) partition by expressions, (3) order by expressions. fn all_expressions(&self) -> WindowPhysicalExpressions { diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs index 9686b8c352..21078ceb8a 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs @@ -15,11 +15,14 @@ // specific language governing permissions and limitations // under the License. +use std::mem::size_of; + use crate::aggregates::group_values::GroupValues; + use arrow::array::{Array, ArrayRef, OffsetSizeTrait, RecordBatch}; +use datafusion_common::Result; use datafusion_expr::EmitTo; use datafusion_physical_expr_common::binary_map::{ArrowBytesMap, OutputType}; -use std::mem::size_of; /// A [`GroupValues`] storing single column of Utf8/LargeUtf8/Binary/LargeBinary values /// @@ -42,11 +45,7 @@ impl<O: OffsetSizeTrait> GroupValuesByes<O> { } impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> { - fn intern( - &mut self, - cols: &[ArrayRef], - groups: &mut Vec<usize>, - ) -> datafusion_common::Result<()> { + fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> { assert_eq!(cols.len(), 1); // look up / add entries in the table @@ -85,7 +84,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> { self.num_groups } - fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<ArrayRef>> { + fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> { // Reset the map to default, and convert it into a single array let map_contents = self.map.take().into_state(); diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index be921e0581..349f9955b6 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -131,7 +131,7 @@ impl ExecutionPlan for TestMemoryExec { } fn as_any(&self) -> &dyn Any { - unimplemented!() + self } fn properties(&self) -> &PlanProperties { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org