This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 79f67b8ef2 feat: implement partition_statistics for WindowAggExec 
(#18534)
79f67b8ef2 is described below

commit 79f67b8ef2d3016c866ae92d7d2f56d1a7830045
Author: Dongpo Liu <[email protected]>
AuthorDate: Wed Dec 31 08:12:43 2025 +0100

    feat: implement partition_statistics for WindowAggExec (#18534)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Ref https://github.com/apache/datafusion/issues/15873
    
    ## Rationale for this change
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    ## What changes are included in this PR?
    
    Implemented partition_statistics for WindowAggExec.
    
    ## Are these changes tested?
    
    Check the unit tests.
    
    ## Are there any user-facing changes?
    
    No
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
---
 .../physical_optimizer/partition_statistics.rs     | 103 +++++++++++++++++++++
 .../physical-plan/src/windows/window_agg_exec.rs   |  38 +++-----
 2 files changed, 118 insertions(+), 23 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs 
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index 468d25e0e5..3f079b88d5 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -30,6 +30,7 @@ mod test {
     use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
     use datafusion_execution::TaskContext;
     use datafusion_execution::config::SessionConfig;
+    use datafusion_expr::{WindowFrame, WindowFunctionDefinition};
     use datafusion_expr_common::operator::Operator;
     use datafusion_functions_aggregate::count::count_udaf;
     use datafusion_physical_expr::Partitioning;
@@ -52,6 +53,7 @@ mod test {
     use datafusion_physical_plan::repartition::RepartitionExec;
     use datafusion_physical_plan::sorts::sort::SortExec;
     use datafusion_physical_plan::union::{InterleaveExec, UnionExec};
+    use datafusion_physical_plan::windows::{WindowAggExec, create_window_expr};
     use datafusion_physical_plan::{
         ExecutionPlan, ExecutionPlanProperties, execute_stream_partitioned,
         get_plan_string,
@@ -1154,4 +1156,105 @@ mod test {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_statistic_by_partition_of_window_agg() -> Result<()> {
+        let scan = create_scan_exec_with_statistics(None, Some(2)).await;
+
+        let window_expr = create_window_expr(
+            &WindowFunctionDefinition::AggregateUDF(count_udaf()),
+            "count".to_owned(),
+            &[col("id", &scan.schema())?],
+            &[], // no partition by
+            &[PhysicalSortExpr::new(
+                col("id", &scan.schema())?,
+                SortOptions::default(),
+            )],
+            Arc::new(WindowFrame::new(Some(false))),
+            scan.schema(),
+            false,
+            false,
+            None,
+        )?;
+
+        let window_agg: Arc<dyn ExecutionPlan> =
+            Arc::new(WindowAggExec::try_new(vec![window_expr], scan, true)?);
+
+        // Verify partition statistics are properly propagated (not unknown)
+        let statistics = 
(0..window_agg.output_partitioning().partition_count())
+            .map(|idx| window_agg.partition_statistics(Some(idx)))
+            .collect::<Result<Vec<_>>>()?;
+
+        assert_eq!(statistics.len(), 2);
+
+        // Window functions preserve input row counts and column statistics
+        // but add unknown statistics for the new window column
+        let expected_statistic_partition_1 = Statistics {
+            num_rows: Precision::Exact(2),
+            total_byte_size: Precision::Absent,
+            column_statistics: vec![
+                ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
+                    min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
+                    sum_value: Precision::Absent,
+                    distinct_count: Precision::Absent,
+                    byte_size: Precision::Exact(8),
+                },
+                ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    max_value: Precision::Exact(ScalarValue::Date32(Some(
+                        DATE_2025_03_02,
+                    ))),
+                    min_value: Precision::Exact(ScalarValue::Date32(Some(
+                        DATE_2025_03_01,
+                    ))),
+                    sum_value: Precision::Absent,
+                    distinct_count: Precision::Absent,
+                    byte_size: Precision::Exact(8),
+                },
+                ColumnStatistics::new_unknown(), // window column
+            ],
+        };
+
+        let expected_statistic_partition_2 = Statistics {
+            num_rows: Precision::Exact(2),
+            total_byte_size: Precision::Absent,
+            column_statistics: vec![
+                ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    max_value: Precision::Exact(ScalarValue::Int32(Some(2))),
+                    min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
+                    sum_value: Precision::Absent,
+                    distinct_count: Precision::Absent,
+                    byte_size: Precision::Exact(8),
+                },
+                ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    max_value: Precision::Exact(ScalarValue::Date32(Some(
+                        DATE_2025_03_04,
+                    ))),
+                    min_value: Precision::Exact(ScalarValue::Date32(Some(
+                        DATE_2025_03_03,
+                    ))),
+                    sum_value: Precision::Absent,
+                    distinct_count: Precision::Absent,
+                    byte_size: Precision::Exact(8),
+                },
+                ColumnStatistics::new_unknown(), // window column
+            ],
+        };
+
+        assert_eq!(statistics[0], expected_statistic_partition_1);
+        assert_eq!(statistics[1], expected_statistic_partition_2);
+
+        // Verify the statistics match actual execution results
+        let expected_stats = vec![
+            ExpectedStatistics::NonEmpty(3, 4, 2),
+            ExpectedStatistics::NonEmpty(1, 2, 2),
+        ];
+        validate_statistics_with_data(window_agg, expected_stats, 0).await?;
+
+        Ok(())
+    }
 }
diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/window_agg_exec.rs
index d6d5f4fdd2..aa99f4f498 100644
--- a/datafusion/physical-plan/src/windows/window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs
@@ -158,24 +158,6 @@ impl WindowAggExec {
                 .unwrap_or_else(Vec::new)
         }
     }
-
-    fn statistics_inner(&self) -> Result<Statistics> {
-        let input_stat = self.input.partition_statistics(None)?;
-        let win_cols = self.window_expr.len();
-        let input_cols = self.input.schema().fields().len();
-        // TODO stats: some windowing function will maintain invariants such 
as min, max...
-        let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
-        // copy stats of the input to the beginning of the schema.
-        column_statistics.extend(input_stat.column_statistics);
-        for _ in 0..win_cols {
-            column_statistics.push(ColumnStatistics::new_unknown())
-        }
-        Ok(Statistics {
-            num_rows: input_stat.num_rows,
-            column_statistics,
-            total_byte_size: Precision::Absent,
-        })
-    }
 }
 
 impl DisplayAs for WindowAggExec {
@@ -291,15 +273,25 @@ impl ExecutionPlan for WindowAggExec {
     }
 
     fn statistics(&self) -> Result<Statistics> {
-        self.statistics_inner()
+        self.partition_statistics(None)
     }
 
     fn partition_statistics(&self, partition: Option<usize>) -> 
Result<Statistics> {
-        if partition.is_none() {
-            self.statistics_inner()
-        } else {
-            Ok(Statistics::new_unknown(&self.schema()))
+        let input_stat = self.input.partition_statistics(partition)?;
+        let win_cols = self.window_expr.len();
+        let input_cols = self.input.schema().fields().len();
+        // TODO stats: some windowing function will maintain invariants such 
as min, max...
+        let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
+        // copy stats of the input to the beginning of the schema.
+        column_statistics.extend(input_stat.column_statistics);
+        for _ in 0..win_cols {
+            column_statistics.push(ColumnStatistics::new_unknown())
         }
+        Ok(Statistics {
+            num_rows: input_stat.num_rows,
+            column_statistics,
+            total_byte_size: Precision::Absent,
+        })
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to