This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 0c30374049 Improve performance of constant aggregate window expression (#16234) 0c30374049 is described below commit 0c3037404929fc3a3c4fbf6b9b7325d422ce10bd Author: suibianwanwan <95014391+suibianwanw...@users.noreply.github.com> AuthorDate: Thu Jun 5 04:21:41 2025 +0800 Improve performance of constant aggregate window expression (#16234) * Improve performance of constant aggregate window expression * Update datafusion/physical-expr/src/window/aggregate.rs Co-authored-by: Jonathan Chen <chenleejonat...@gmail.com> * fmt * Update datafusion/physical-expr/src/window/aggregate.rs Co-authored-by: Yongting You <2010you...@gmail.com> * Rename * fmt --------- Co-authored-by: Jonathan Chen <chenleejonat...@gmail.com> Co-authored-by: Yongting You <2010you...@gmail.com> --- datafusion/physical-expr/src/window/aggregate.rs | 34 +++++++++++++++++++++- .../physical-expr/src/window/sliding_aggregate.rs | 4 +++ datafusion/physical-expr/src/window/window_expr.rs | 11 ++++++- 3 files changed, 47 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index 9b95979613..dae0667afb 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -34,7 +34,7 @@ use arrow::array::ArrayRef; use arrow::datatypes::FieldRef; use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Result, ScalarValue}; -use datafusion_expr::{Accumulator, WindowFrame}; +use datafusion_expr::{Accumulator, WindowFrame, WindowFrameBound, WindowFrameUnits}; use datafusion_physical_expr_common::sort_expr::LexOrdering; /// A window expr that takes the form of an aggregate function. @@ -46,6 +46,7 @@ pub struct PlainAggregateWindowExpr { partition_by: Vec<Arc<dyn PhysicalExpr>>, order_by: LexOrdering, window_frame: Arc<WindowFrame>, + is_constant_in_partition: bool, } impl PlainAggregateWindowExpr { @@ -56,11 +57,14 @@ impl PlainAggregateWindowExpr { order_by: &LexOrdering, window_frame: Arc<WindowFrame>, ) -> Self { + let is_constant_in_partition = + Self::is_window_constant_in_partition(order_by, &window_frame); Self { aggregate, partition_by: partition_by.to_vec(), order_by: order_by.clone(), window_frame, + is_constant_in_partition, } } @@ -85,6 +89,30 @@ impl PlainAggregateWindowExpr { ); } } + + // Returns true if every row in the partition has the same window frame. This allows + // for preventing bound + function calculation for every row due to the values being the + // same. + // + // This occurs when both bounds fall under either condition below: + // 1. Bound is unbounded (`Preceding` or `Following`) + // 2. Bound is `CurrentRow` while using `Range` units with no order by clause + // This results in an invalid range specification. Following PostgreSQL’s convention, + // we interpret this as the entire partition being used for the current window frame. + fn is_window_constant_in_partition( + order_by: &LexOrdering, + window_frame: &WindowFrame, + ) -> bool { + let is_constant_bound = |bound: &WindowFrameBound| match bound { + WindowFrameBound::CurrentRow => { + window_frame.units == WindowFrameUnits::Range && order_by.is_empty() + } + _ => bound.is_unbounded(), + }; + + is_constant_bound(&window_frame.start_bound) + && is_constant_bound(&window_frame.end_bound) + } } /// peer based evaluation based on the fact that batch is pre-sorted given the sort columns @@ -213,4 +241,8 @@ impl AggregateWindowExpr for PlainAggregateWindowExpr { accumulator.evaluate() } } + + fn is_constant_in_partition(&self) -> bool { + self.is_constant_in_partition + } } diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index 2b22299f93..09d6af7487 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -210,4 +210,8 @@ impl AggregateWindowExpr for SlidingAggregateWindowExpr { accumulator.evaluate() } } + + fn is_constant_in_partition(&self) -> bool { + false + } } diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index 8d72604a6a..70a73c44ae 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -186,6 +186,10 @@ pub trait AggregateWindowExpr: WindowExpr { accumulator: &mut Box<dyn Accumulator>, ) -> Result<ScalarValue>; + /// Indicates whether this window function always produces the same result + /// for all rows in the partition. + fn is_constant_in_partition(&self) -> bool; + /// Evaluates the window function against the batch. fn aggregate_evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> { let mut accumulator = self.get_accumulator()?; @@ -272,8 +276,13 @@ pub trait AggregateWindowExpr: WindowExpr { not_end: bool, ) -> Result<ArrayRef> { let values = self.evaluate_args(record_batch)?; - let order_bys = get_orderby_values(self.order_by_columns(record_batch)?); + if self.is_constant_in_partition() { + accumulator.update_batch(&values)?; + let value = accumulator.evaluate()?; + return value.to_array_of_size(record_batch.num_rows()); + } + let order_bys = get_orderby_values(self.order_by_columns(record_batch)?); let most_recent_row_order_bys = most_recent_row .map(|batch| self.order_by_columns(batch)) .transpose()? --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org