This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c30374049 Improve performance of constant aggregate window expression 
(#16234)
0c30374049 is described below

commit 0c3037404929fc3a3c4fbf6b9b7325d422ce10bd
Author: suibianwanwan <95014391+suibianwanw...@users.noreply.github.com>
AuthorDate: Thu Jun 5 04:21:41 2025 +0800

    Improve performance of constant aggregate window expression (#16234)
    
    * Improve performance of constant aggregate window expression
    
    * Update datafusion/physical-expr/src/window/aggregate.rs
    
    Co-authored-by: Jonathan Chen <chenleejonat...@gmail.com>
    
    * fmt
    
    * Update datafusion/physical-expr/src/window/aggregate.rs
    
    Co-authored-by: Yongting You <2010you...@gmail.com>
    
    * Rename
    
    * fmt
    
    ---------
    
    Co-authored-by: Jonathan Chen <chenleejonat...@gmail.com>
    Co-authored-by: Yongting You <2010you...@gmail.com>
---
 datafusion/physical-expr/src/window/aggregate.rs   | 34 +++++++++++++++++++++-
 .../physical-expr/src/window/sliding_aggregate.rs  |  4 +++
 datafusion/physical-expr/src/window/window_expr.rs | 11 ++++++-
 3 files changed, 47 insertions(+), 2 deletions(-)

diff --git a/datafusion/physical-expr/src/window/aggregate.rs 
b/datafusion/physical-expr/src/window/aggregate.rs
index 9b95979613..dae0667afb 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -34,7 +34,7 @@ use arrow::array::ArrayRef;
 use arrow::datatypes::FieldRef;
 use arrow::record_batch::RecordBatch;
 use datafusion_common::{DataFusionError, Result, ScalarValue};
-use datafusion_expr::{Accumulator, WindowFrame};
+use datafusion_expr::{Accumulator, WindowFrame, WindowFrameBound, 
WindowFrameUnits};
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
 
 /// A window expr that takes the form of an aggregate function.
@@ -46,6 +46,7 @@ pub struct PlainAggregateWindowExpr {
     partition_by: Vec<Arc<dyn PhysicalExpr>>,
     order_by: LexOrdering,
     window_frame: Arc<WindowFrame>,
+    is_constant_in_partition: bool,
 }
 
 impl PlainAggregateWindowExpr {
@@ -56,11 +57,14 @@ impl PlainAggregateWindowExpr {
         order_by: &LexOrdering,
         window_frame: Arc<WindowFrame>,
     ) -> Self {
+        let is_constant_in_partition =
+            Self::is_window_constant_in_partition(order_by, &window_frame);
         Self {
             aggregate,
             partition_by: partition_by.to_vec(),
             order_by: order_by.clone(),
             window_frame,
+            is_constant_in_partition,
         }
     }
 
@@ -85,6 +89,30 @@ impl PlainAggregateWindowExpr {
             );
         }
     }
+
+    // Returns true if every row in the partition has the same window frame. 
This allows
+    // for preventing bound + function calculation for every row due to the 
values being the
+    // same.
+    //
+    // This occurs when both bounds fall under either condition below:
+    //  1. Bound is unbounded (`Preceding` or `Following`)
+    //  2. Bound is `CurrentRow` while using `Range` units with no order by 
clause
+    //  This results in an invalid range specification. Following PostgreSQL’s 
convention,
+    //  we interpret this as the entire partition being used for the current 
window frame.
+    fn is_window_constant_in_partition(
+        order_by: &LexOrdering,
+        window_frame: &WindowFrame,
+    ) -> bool {
+        let is_constant_bound = |bound: &WindowFrameBound| match bound {
+            WindowFrameBound::CurrentRow => {
+                window_frame.units == WindowFrameUnits::Range && 
order_by.is_empty()
+            }
+            _ => bound.is_unbounded(),
+        };
+
+        is_constant_bound(&window_frame.start_bound)
+            && is_constant_bound(&window_frame.end_bound)
+    }
 }
 
 /// peer based evaluation based on the fact that batch is pre-sorted given the 
sort columns
@@ -213,4 +241,8 @@ impl AggregateWindowExpr for PlainAggregateWindowExpr {
             accumulator.evaluate()
         }
     }
+
+    fn is_constant_in_partition(&self) -> bool {
+        self.is_constant_in_partition
+    }
 }
diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs 
b/datafusion/physical-expr/src/window/sliding_aggregate.rs
index 2b22299f93..09d6af7487 100644
--- a/datafusion/physical-expr/src/window/sliding_aggregate.rs
+++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs
@@ -210,4 +210,8 @@ impl AggregateWindowExpr for SlidingAggregateWindowExpr {
             accumulator.evaluate()
         }
     }
+
+    fn is_constant_in_partition(&self) -> bool {
+        false
+    }
 }
diff --git a/datafusion/physical-expr/src/window/window_expr.rs 
b/datafusion/physical-expr/src/window/window_expr.rs
index 8d72604a6a..70a73c44ae 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -186,6 +186,10 @@ pub trait AggregateWindowExpr: WindowExpr {
         accumulator: &mut Box<dyn Accumulator>,
     ) -> Result<ScalarValue>;
 
+    /// Indicates whether this window function always produces the same result
+    /// for all rows in the partition.
+    fn is_constant_in_partition(&self) -> bool;
+
     /// Evaluates the window function against the batch.
     fn aggregate_evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
         let mut accumulator = self.get_accumulator()?;
@@ -272,8 +276,13 @@ pub trait AggregateWindowExpr: WindowExpr {
         not_end: bool,
     ) -> Result<ArrayRef> {
         let values = self.evaluate_args(record_batch)?;
-        let order_bys = 
get_orderby_values(self.order_by_columns(record_batch)?);
 
+        if self.is_constant_in_partition() {
+            accumulator.update_batch(&values)?;
+            let value = accumulator.evaluate()?;
+            return value.to_array_of_size(record_batch.num_rows());
+        }
+        let order_bys = 
get_orderby_values(self.order_by_columns(record_batch)?);
         let most_recent_row_order_bys = most_recent_row
             .map(|batch| self.order_by_columns(batch))
             .transpose()?


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to