This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 14f34f641e fix: preserve byte-size statistics in AggregateExec (#18885)
14f34f641e is described below

commit 14f34f641e6c11ca3719736db6cbd63a063ea8a5
Author: Tamar <[email protected]>
AuthorDate: Tue Nov 25 23:30:33 2025 +0200

    fix: preserve byte-size statistics in AggregateExec (#18885)
    
    Previously, AggregateExec dropped total_byte_size statistics
    (Precision::Absent) through aggregation operations, preventing the
    optimizer from making informed decisions about memory allocation and
    execution strategies(join side selection -> dynamic filters).
    
    This commit implements proportional byte-size scaling based on row count
    ratios:
    - Added calculate_scaled_byte_size helper with inline optimization
    - Scales byte size for Final/FinalPartitioned without GROUP BY
    - Scales byte size proportionally for all other aggregation modes
    - Always returns Precision::Inexact for estimates (semantically correct)
    - Returns Precision::Absent when insufficient input statistics
    
    Added test coverage for edge cases (absent statistics, zero rows).
    
    ## Which issue does this PR close?
    https://github.com/apache/datafusion/issues/18850
    
    - Closes #18850
    
    ## Rationale for this change
    Without byte-size statistics, the optimizer cannot estimate memory
    requirements for join-side selection, dynamic filter generation, and
    memory allocation decisions. This preserves statistics using
    proportional scaling (bytes_per_row × output_rows).
    
    ## What changes are included in this PR?
    1. Modified `statistics_inner` to calculate proportional byte size
    instead of returning `Precision::Absent`
    2. Added `calculate_scaled_byte_size` helper (inline optimized, guards
    against division by zero)
    3. Updated test assertions and added edge case coverage
    
    ## Are these changes tested?
    Yes:
    - New `test_aggregate_statistics_edge_cases` covers edge cases scenarios
    - Existing tests confirm stats propagate correctly through the
    aggregation pipeline
    
    ## Are there any user-facing changes?
    No breaking changes.
    Internal optimization that may improve query planning and provide more
    accurate memory estimates in EXPLAIN output.
    
    Co-authored-by: Daniël Heres <[email protected]>
---
 datafusion/core/tests/dataframe/mod.rs             | 102 +++++++++---------
 .../physical_optimizer/partition_statistics.rs     |   4 +-
 datafusion/physical-plan/src/aggregates/mod.rs     | 118 ++++++++++++++++++++-
 datafusion/sqllogictest/test_files/union.slt       |  20 ++--
 4 files changed, 182 insertions(+), 62 deletions(-)

diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index fb6dc3bcba..0c6ccf1b07 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -3325,30 +3325,33 @@ async fn test_count_wildcard_on_where_scalar_subquery() 
-> Result<()> {
     assert_snapshot!(
         pretty_format_batches(&sql_results).unwrap(),
         @r"
-    
+---------------+-------------------------------------------------------------------------------------------------------------------------+
-    | plan_type     | plan                                                     
                                                               |
-    
+---------------+-------------------------------------------------------------------------------------------------------------------------+
-    | logical_plan  | Projection: t1.a, t1.b                                   
                                                               |
-    |               |   Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL 
THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0)        |
-    |               |     Projection: t1.a, t1.b, __scalar_sq_1.count(*), 
__scalar_sq_1.__always_true                                         |
-    |               |       Left Join: t1.a = __scalar_sq_1.a                  
                                                               |
-    |               |         TableScan: t1 projection=[a, b]                  
                                                               |
-    |               |         SubqueryAlias: __scalar_sq_1                     
                                                               |
-    |               |           Projection: count(Int64(1)) AS count(*), t2.a, 
Boolean(true) AS __always_true                                 |
-    |               |             Aggregate: groupBy=[[t2.a]], 
aggr=[[count(Int64(1))]]                                                       |
-    |               |               TableScan: t2 projection=[a]               
                                                               |
-    | physical_plan | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 
ELSE count(*)@2 END > 0, projection=[a@0, b@1]                     |
-    |               |   CoalesceBatchesExec: target_batch_size=8192            
                                                               |
-    |               |     HashJoinExec: mode=CollectLeft, join_type=Left, 
on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] |
-    |               |       DataSourceExec: partitions=1, partition_sizes=[1]  
                                                               |
-    |               |       ProjectionExec: expr=[count(Int64(1))@1 as 
count(*), a@0 as a, true as __always_true]                             |
-    |               |         AggregateExec: mode=FinalPartitioned, gby=[a@0 
as a], aggr=[count(Int64(1))]                                    |
-    |               |           CoalesceBatchesExec: target_batch_size=8192    
                                                               |
-    |               |             RepartitionExec: partitioning=Hash([a@0], 
4), input_partitions=1                                            |
-    |               |               AggregateExec: mode=Partial, gby=[a@0 as 
a], aggr=[count(Int64(1))]                                       |
-    |               |                 DataSourceExec: partitions=1, 
partition_sizes=[1]                                                       |
-    |               |                                                          
                                                               |
-    
+---------------+-------------------------------------------------------------------------------------------------------------------------+
+    
+---------------+------------------------------------------------------------------------------------------------------------------------------+
+    | plan_type     | plan                                                     
                                                                    |
+    
+---------------+------------------------------------------------------------------------------------------------------------------------------+
+    | logical_plan  | Projection: t1.a, t1.b                                   
                                                                    |
+    |               |   Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL 
THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0)             |
+    |               |     Projection: t1.a, t1.b, __scalar_sq_1.count(*), 
__scalar_sq_1.__always_true                                              |
+    |               |       Left Join: t1.a = __scalar_sq_1.a                  
                                                                    |
+    |               |         TableScan: t1 projection=[a, b]                  
                                                                    |
+    |               |         SubqueryAlias: __scalar_sq_1                     
                                                                    |
+    |               |           Projection: count(Int64(1)) AS count(*), t2.a, 
Boolean(true) AS __always_true                                      |
+    |               |             Aggregate: groupBy=[[t2.a]], 
aggr=[[count(Int64(1))]]                                                        
    |
+    |               |               TableScan: t2 projection=[a]               
                                                                    |
+    | physical_plan | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 
ELSE count(*)@2 END > 0, projection=[a@0, b@1]                          |
+    |               |   RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1                                                       |
+    |               |     ProjectionExec: expr=[a@2 as a, b@3 as b, count(*)@0 
as count(*), __always_true@1 as __always_true]                      |
+    |               |       CoalesceBatchesExec: target_batch_size=8192        
                                                                    |
+    |               |         HashJoinExec: mode=CollectLeft, join_type=Right, 
on=[(a@1, a@0)], projection=[count(*)@0, __always_true@2, a@3, b@4] |
+    |               |           CoalescePartitionsExec                         
                                                                    |
+    |               |             ProjectionExec: expr=[count(Int64(1))@1 as 
count(*), a@0 as a, true as __always_true]                            |
+    |               |               AggregateExec: mode=FinalPartitioned, 
gby=[a@0 as a], aggr=[count(Int64(1))]                                   |
+    |               |                 CoalesceBatchesExec: 
target_batch_size=8192                                                          
        |
+    |               |                   RepartitionExec: 
partitioning=Hash([a@0], 4), input_partitions=1                                 
          |
+    |               |                     AggregateExec: mode=Partial, 
gby=[a@0 as a], aggr=[count(Int64(1))]                                      |
+    |               |                       DataSourceExec: partitions=1, 
partition_sizes=[1]                                                      |
+    |               |           DataSourceExec: partitions=1, 
partition_sizes=[1]                                                             
     |
+    |               |                                                          
                                                                    |
+    
+---------------+------------------------------------------------------------------------------------------------------------------------------+
     "
     );
 
@@ -3380,30 +3383,33 @@ async fn test_count_wildcard_on_where_scalar_subquery() 
-> Result<()> {
     assert_snapshot!(
         pretty_format_batches(&df_results).unwrap(),
         @r"
-    
+---------------+-------------------------------------------------------------------------------------------------------------------------+
-    | plan_type     | plan                                                     
                                                               |
-    
+---------------+-------------------------------------------------------------------------------------------------------------------------+
-    | logical_plan  | Projection: t1.a, t1.b                                   
                                                               |
-    |               |   Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL 
THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0)        |
-    |               |     Projection: t1.a, t1.b, __scalar_sq_1.count(*), 
__scalar_sq_1.__always_true                                         |
-    |               |       Left Join: t1.a = __scalar_sq_1.a                  
                                                               |
-    |               |         TableScan: t1 projection=[a, b]                  
                                                               |
-    |               |         SubqueryAlias: __scalar_sq_1                     
                                                               |
-    |               |           Projection: count(*), t2.a, Boolean(true) AS 
__always_true                                                    |
-    |               |             Aggregate: groupBy=[[t2.a]], 
aggr=[[count(Int64(1)) AS count(*)]]                                           |
-    |               |               TableScan: t2 projection=[a]               
                                                               |
-    | physical_plan | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 
ELSE count(*)@2 END > 0, projection=[a@0, b@1]                     |
-    |               |   CoalesceBatchesExec: target_batch_size=8192            
                                                               |
-    |               |     HashJoinExec: mode=CollectLeft, join_type=Left, 
on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] |
-    |               |       DataSourceExec: partitions=1, partition_sizes=[1]  
                                                               |
-    |               |       ProjectionExec: expr=[count(*)@1 as count(*), a@0 
as a, true as __always_true]                                    |
-    |               |         AggregateExec: mode=FinalPartitioned, gby=[a@0 
as a], aggr=[count(*)]                                           |
-    |               |           CoalesceBatchesExec: target_batch_size=8192    
                                                               |
-    |               |             RepartitionExec: partitioning=Hash([a@0], 
4), input_partitions=1                                            |
-    |               |               AggregateExec: mode=Partial, gby=[a@0 as 
a], aggr=[count(*)]                                              |
-    |               |                 DataSourceExec: partitions=1, 
partition_sizes=[1]                                                       |
-    |               |                                                          
                                                               |
-    
+---------------+-------------------------------------------------------------------------------------------------------------------------+
+    
+---------------+------------------------------------------------------------------------------------------------------------------------------+
+    | plan_type     | plan                                                     
                                                                    |
+    
+---------------+------------------------------------------------------------------------------------------------------------------------------+
+    | logical_plan  | Projection: t1.a, t1.b                                   
                                                                    |
+    |               |   Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL 
THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0)             |
+    |               |     Projection: t1.a, t1.b, __scalar_sq_1.count(*), 
__scalar_sq_1.__always_true                                              |
+    |               |       Left Join: t1.a = __scalar_sq_1.a                  
                                                                    |
+    |               |         TableScan: t1 projection=[a, b]                  
                                                                    |
+    |               |         SubqueryAlias: __scalar_sq_1                     
                                                                    |
+    |               |           Projection: count(*), t2.a, Boolean(true) AS 
__always_true                                                         |
+    |               |             Aggregate: groupBy=[[t2.a]], 
aggr=[[count(Int64(1)) AS count(*)]]                                            
    |
+    |               |               TableScan: t2 projection=[a]               
                                                                    |
+    | physical_plan | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 
ELSE count(*)@2 END > 0, projection=[a@0, b@1]                          |
+    |               |   RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1                                                       |
+    |               |     ProjectionExec: expr=[a@2 as a, b@3 as b, count(*)@0 
as count(*), __always_true@1 as __always_true]                      |
+    |               |       CoalesceBatchesExec: target_batch_size=8192        
                                                                    |
+    |               |         HashJoinExec: mode=CollectLeft, join_type=Right, 
on=[(a@1, a@0)], projection=[count(*)@0, __always_true@2, a@3, b@4] |
+    |               |           CoalescePartitionsExec                         
                                                                    |
+    |               |             ProjectionExec: expr=[count(*)@1 as 
count(*), a@0 as a, true as __always_true]                                   |
+    |               |               AggregateExec: mode=FinalPartitioned, 
gby=[a@0 as a], aggr=[count(*)]                                          |
+    |               |                 CoalesceBatchesExec: 
target_batch_size=8192                                                          
        |
+    |               |                   RepartitionExec: 
partitioning=Hash([a@0], 4), input_partitions=1                                 
          |
+    |               |                     AggregateExec: mode=Partial, 
gby=[a@0 as a], aggr=[count(*)]                                             |
+    |               |                       DataSourceExec: partitions=1, 
partition_sizes=[1]                                                      |
+    |               |           DataSourceExec: partitions=1, 
partition_sizes=[1]                                                             
     |
+    |               |                                                          
                                                                    |
+    
+---------------+------------------------------------------------------------------------------------------------------------------------------+
     "
     );
 
diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs 
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index 7034b71fd5..7045cb8ea1 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -627,7 +627,7 @@ mod test {
 
         let expected_p0_statistics = Statistics {
             num_rows: Precision::Inexact(2),
-            total_byte_size: Precision::Absent,
+            total_byte_size: Precision::Inexact(110),
             column_statistics: vec![
                 ColumnStatistics {
                     null_count: Precision::Absent,
@@ -645,7 +645,7 @@ mod test {
 
         let expected_p1_statistics = Statistics {
             num_rows: Precision::Inexact(2),
-            total_byte_size: Precision::Absent,
+            total_byte_size: Precision::Inexact(110),
             column_statistics: vec![
                 ColumnStatistics {
                     null_count: Precision::Absent,
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index 6bf59fd3d3..f175fd4cdb 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -792,10 +792,13 @@ impl AggregateExec {
             AggregateMode::Final | AggregateMode::FinalPartitioned
                 if self.group_by.expr.is_empty() =>
             {
+                let total_byte_size =
+                    Self::calculate_scaled_byte_size(child_statistics, 1);
+
                 Ok(Statistics {
                     num_rows: Precision::Exact(1),
                     column_statistics,
-                    total_byte_size: Precision::Absent,
+                    total_byte_size,
                 })
             }
             _ => {
@@ -815,14 +818,48 @@ impl AggregateExec {
                 } else {
                     Precision::Absent
                 };
+
+                let total_byte_size = num_rows
+                    .get_value()
+                    .and_then(|&output_rows| {
+                        Self::calculate_scaled_byte_size(child_statistics, 
output_rows)
+                            .get_value()
+                            .map(|&bytes| Precision::Inexact(bytes))
+                    })
+                    .unwrap_or(Precision::Absent);
+
                 Ok(Statistics {
                     num_rows,
                     column_statistics,
-                    total_byte_size: Precision::Absent,
+                    total_byte_size,
                 })
             }
         }
     }
+
+    /// Calculate scaled byte size based on row count ratio.
+    /// Returns `Precision::Absent` if input statistics are insufficient.
+    /// Returns `Precision::Inexact` with the scaled value otherwise.
+    ///
+    /// This is a simple heuristic that assumes uniform row sizes.
+    #[inline]
+    fn calculate_scaled_byte_size(
+        input_stats: &Statistics,
+        target_row_count: usize,
+    ) -> Precision<usize> {
+        match (
+            input_stats.num_rows.get_value(),
+            input_stats.total_byte_size.get_value(),
+        ) {
+            (Some(&input_rows), Some(&input_bytes)) if input_rows > 0 => {
+                let bytes_per_row = input_bytes as f64 / input_rows as f64;
+                let scaled_bytes =
+                    (bytes_per_row * target_row_count as f64).ceil() as usize;
+                Precision::Inexact(scaled_bytes)
+            }
+            _ => Precision::Absent,
+        }
+    }
 }
 
 impl DisplayAs for AggregateExec {
@@ -1921,6 +1958,10 @@ mod tests {
             input_schema,
         )?);
 
+        // Verify statistics are preserved proportionally through aggregation
+        let final_stats = merged_aggregate.partition_statistics(None)?;
+        assert!(final_stats.total_byte_size.get_value().is_some());
+
         let task_ctx = if spill {
             // enlarge memory limit to let the final aggregation finish
             new_spill_ctx(2, 2600)
@@ -3146,4 +3187,77 @@ mod tests {
         run_test_with_spill_pool_if_necessary(20_000, false).await?;
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_aggregate_statistics_edge_cases() -> Result<()> {
+        use crate::test::exec::StatisticsExec;
+        use datafusion_common::ColumnStatistics;
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Float64, false),
+        ]));
+
+        // Test 1: Absent statistics remain absent
+        let input = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Precision::Exact(100),
+                total_byte_size: Precision::Absent,
+                column_statistics: vec![
+                    ColumnStatistics::new_unknown(),
+                    ColumnStatistics::new_unknown(),
+                ],
+            },
+            (*schema).clone(),
+        )) as Arc<dyn ExecutionPlan>;
+
+        let agg = Arc::new(AggregateExec::try_new(
+            AggregateMode::Final,
+            PhysicalGroupBy::default(),
+            vec![Arc::new(
+                AggregateExprBuilder::new(count_udaf(), vec![col("a", 
&schema)?])
+                    .schema(Arc::clone(&schema))
+                    .alias("COUNT(a)")
+                    .build()?,
+            )],
+            vec![None],
+            input,
+            Arc::clone(&schema),
+        )?);
+
+        let stats = agg.partition_statistics(None)?;
+        assert_eq!(stats.total_byte_size, Precision::Absent);
+
+        // Test 2: Zero rows returns Absent (can't estimate output size from 
zero input)
+        let input_zero = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Precision::Exact(0),
+                total_byte_size: Precision::Exact(0),
+                column_statistics: vec![
+                    ColumnStatistics::new_unknown(),
+                    ColumnStatistics::new_unknown(),
+                ],
+            },
+            (*schema).clone(),
+        )) as Arc<dyn ExecutionPlan>;
+
+        let agg_zero = Arc::new(AggregateExec::try_new(
+            AggregateMode::Final,
+            PhysicalGroupBy::default(),
+            vec![Arc::new(
+                AggregateExprBuilder::new(count_udaf(), vec![col("a", 
&schema)?])
+                    .schema(Arc::clone(&schema))
+                    .alias("COUNT(a)")
+                    .build()?,
+            )],
+            vec![None],
+            input_zero,
+            Arc::clone(&schema),
+        )?);
+
+        let stats_zero = agg_zero.partition_statistics(None)?;
+        assert_eq!(stats_zero.total_byte_size, Precision::Absent);
+
+        Ok(())
+    }
 }
diff --git a/datafusion/sqllogictest/test_files/union.slt 
b/datafusion/sqllogictest/test_files/union.slt
index 9e63f79f45..c20598239c 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -307,17 +307,17 @@ logical_plan
 physical_plan
 01)UnionExec
 02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, 
CAST(t2.id AS Int32)@2), (name@1, name@1)], NullsEqual: true
+03)----HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(CAST(t2.id AS 
Int32)@2, id@0), (name@1, name@1)], NullsEqual: true
 04)------CoalescePartitionsExec
-05)--------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as 
name], aggr=[]
-06)----------CoalesceBatchesExec: target_batch_size=2
-07)------------RepartitionExec: partitioning=Hash([id@0, name@1], 4), 
input_partitions=4
-08)--------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as 
name], aggr=[]
-09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-10)------------------DataSourceExec: partitions=1, partition_sizes=[1]
-11)------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) 
as CAST(t2.id AS Int32)]
-12)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-13)----------DataSourceExec: partitions=1, partition_sizes=[1]
+05)--------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS 
Int32) as CAST(t2.id AS Int32)]
+06)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
+08)------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as 
name], aggr=[]
+09)--------CoalesceBatchesExec: target_batch_size=2
+10)----------RepartitionExec: partitioning=Hash([id@0, name@1], 4), 
input_partitions=4
+11)------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], 
aggr=[]
+12)--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+13)----------------DataSourceExec: partitions=1, partition_sizes=[1]
 14)--ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name]
 15)----CoalesceBatchesExec: target_batch_size=2
 16)------HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(CAST(t2.id 
AS Int32)@2, id@0), (name@1, name@1)], projection=[id@0, name@1], NullsEqual: 
true


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to