This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f396b8881 Push partition_statistics into DataSource (#18233)
8f396b8881 is described below

commit 8f396b888112bb90725343d250184f20afbacaba
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Sun Oct 26 19:11:17 2025 -0500

    Push partition_statistics into DataSource (#18233)
    
    Removes a downcast match in favor of use of the trait. This mirrors the
    changes to DataSourceExec to use partition_statistics instead of
    statistics from https://github.com/apache/datafusion/pull/15852
---
 datafusion/datasource/src/file_scan_config.rs | 124 +++++++++++++++++++++++++-
 datafusion/datasource/src/memory.rs           |  28 ++++--
 datafusion/datasource/src/source.rs           |  32 +++----
 3 files changed, 159 insertions(+), 25 deletions(-)

diff --git a/datafusion/datasource/src/file_scan_config.rs 
b/datafusion/datasource/src/file_scan_config.rs
index 4dfb6a4ec3..695252803b 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -598,8 +598,39 @@ impl DataSource for FileScanConfig {
         SchedulingType::Cooperative
     }
 
-    fn statistics(&self) -> Result<Statistics> {
-        Ok(self.projected_stats())
+    fn partition_statistics(&self, partition: Option<usize>) -> 
Result<Statistics> {
+        if let Some(partition) = partition {
+            // Get statistics for a specific partition
+            if let Some(file_group) = self.file_groups.get(partition) {
+                if let Some(stat) = file_group.file_statistics(None) {
+                    // Project the statistics based on the projection
+                    let table_cols_stats = self
+                        .projection_indices()
+                        .into_iter()
+                        .map(|idx| {
+                            if idx < self.file_schema().fields().len() {
+                                stat.column_statistics[idx].clone()
+                            } else {
+                                // TODO provide accurate stat for partition 
column
+                                // See 
https://github.com/apache/datafusion/issues/1186
+                                ColumnStatistics::new_unknown()
+                            }
+                        })
+                        .collect();
+
+                    return Ok(Statistics {
+                        num_rows: stat.num_rows,
+                        total_byte_size: stat.total_byte_size,
+                        column_statistics: table_cols_stats,
+                    });
+                }
+            }
+            // If no statistics available for this partition, return unknown
+            Ok(Statistics::new_unknown(&self.projected_schema()))
+        } else {
+            // Return aggregate statistics across all partitions
+            Ok(self.projected_stats())
+        }
     }
 
     fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
@@ -1603,7 +1634,7 @@ mod tests {
         );
 
         let source_statistics = conf.file_source.statistics().unwrap();
-        let conf_stats = conf.statistics().unwrap();
+        let conf_stats = conf.partition_statistics(None).unwrap();
 
         // projection should be reflected in the file source statistics
         assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
@@ -2510,4 +2541,91 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_partition_statistics_projection() {
+        // This test verifies that partition_statistics applies projection 
correctly.
+        // The old implementation had a bug where it returned file group 
statistics
+        // without applying the projection, returning all column statistics 
instead
+        // of just the projected ones.
+
+        use crate::source::DataSourceExec;
+        use datafusion_physical_plan::ExecutionPlan;
+
+        // Create a schema with 4 columns
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("col0", DataType::Int32, false),
+            Field::new("col1", DataType::Int32, false),
+            Field::new("col2", DataType::Int32, false),
+            Field::new("col3", DataType::Int32, false),
+        ]));
+
+        // Create statistics for all 4 columns
+        let file_group_stats = Statistics {
+            num_rows: Precision::Exact(100),
+            total_byte_size: Precision::Exact(1024),
+            column_statistics: vec![
+                ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    ..ColumnStatistics::new_unknown()
+                },
+                ColumnStatistics {
+                    null_count: Precision::Exact(5),
+                    ..ColumnStatistics::new_unknown()
+                },
+                ColumnStatistics {
+                    null_count: Precision::Exact(10),
+                    ..ColumnStatistics::new_unknown()
+                },
+                ColumnStatistics {
+                    null_count: Precision::Exact(15),
+                    ..ColumnStatistics::new_unknown()
+                },
+            ],
+        };
+
+        // Create a file group with statistics
+        let file_group = 
FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)])
+            .with_statistics(Arc::new(file_group_stats));
+
+        // Create a FileScanConfig with projection: only keep columns 0 and 2
+        let config = FileScanConfigBuilder::new(
+            ObjectStoreUrl::parse("test:///").unwrap(),
+            Arc::clone(&schema),
+            Arc::new(MockSource::default()),
+        )
+        .with_projection(Some(vec![0, 2])) // Only project columns 0 and 2
+        .with_file_groups(vec![file_group])
+        .build();
+
+        // Create a DataSourceExec from the config
+        let exec = DataSourceExec::from_data_source(config);
+
+        // Get statistics for partition 0
+        let partition_stats = exec.partition_statistics(Some(0)).unwrap();
+
+        // Verify that only 2 columns are in the statistics (the projected 
ones)
+        assert_eq!(
+            partition_stats.column_statistics.len(),
+            2,
+            "Expected 2 column statistics (projected), but got {}",
+            partition_stats.column_statistics.len()
+        );
+
+        // Verify the column statistics are for columns 0 and 2
+        assert_eq!(
+            partition_stats.column_statistics[0].null_count,
+            Precision::Exact(0),
+            "First projected column should be col0 with 0 nulls"
+        );
+        assert_eq!(
+            partition_stats.column_statistics[1].null_count,
+            Precision::Exact(10),
+            "Second projected column should be col2 with 10 nulls"
+        );
+
+        // Verify row count and byte size are preserved
+        assert_eq!(partition_stats.num_rows, Precision::Exact(100));
+        assert_eq!(partition_stats.total_byte_size, Precision::Exact(1024));
+    }
 }
diff --git a/datafusion/datasource/src/memory.rs 
b/datafusion/datasource/src/memory.rs
index eb55aa9b0b..7d5c8c4834 100644
--- a/datafusion/datasource/src/memory.rs
+++ b/datafusion/datasource/src/memory.rs
@@ -21,6 +21,7 @@ use std::collections::BinaryHeap;
 use std::fmt;
 use std::fmt::Debug;
 use std::ops::Deref;
+use std::slice::from_ref;
 use std::sync::Arc;
 
 use crate::sink::DataSink;
@@ -192,12 +193,27 @@ impl DataSource for MemorySourceConfig {
         SchedulingType::Cooperative
     }
 
-    fn statistics(&self) -> Result<Statistics> {
-        Ok(common::compute_record_batch_statistics(
-            &self.partitions,
-            &self.schema,
-            self.projection.clone(),
-        ))
+    fn partition_statistics(&self, partition: Option<usize>) -> 
Result<Statistics> {
+        if let Some(partition) = partition {
+            // Compute statistics for a specific partition
+            if let Some(batches) = self.partitions.get(partition) {
+                Ok(common::compute_record_batch_statistics(
+                    from_ref(batches),
+                    &self.schema,
+                    self.projection.clone(),
+                ))
+            } else {
+                // Invalid partition index
+                Ok(Statistics::new_unknown(&self.projected_schema))
+            }
+        } else {
+            // Compute statistics across all partitions
+            Ok(common::compute_record_batch_statistics(
+                &self.partitions,
+                &self.schema,
+                self.projection.clone(),
+            ))
+        }
     }
 
     fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
diff --git a/datafusion/datasource/src/source.rs 
b/datafusion/datasource/src/source.rs
index 20d9a1d6e5..11a8a3867b 100644
--- a/datafusion/datasource/src/source.rs
+++ b/datafusion/datasource/src/source.rs
@@ -151,7 +151,21 @@ pub trait DataSource: Send + Sync + Debug {
     fn scheduling_type(&self) -> SchedulingType {
         SchedulingType::NonCooperative
     }
-    fn statistics(&self) -> Result<Statistics>;
+
+    /// Returns statistics for a specific partition, or aggregate statistics
+    /// across all partitions if `partition` is `None`.
+    fn partition_statistics(&self, partition: Option<usize>) -> 
Result<Statistics>;
+
+    /// Returns aggregate statistics across all partitions.
+    ///
+    /// # Deprecated
+    /// Use [`Self::partition_statistics`] instead, which provides more 
fine-grained
+    /// control over statistics retrieval (per-partition or aggregate).
+    #[deprecated(since = "51.0.0", note = "Use partition_statistics instead")]
+    fn statistics(&self) -> Result<Statistics> {
+        self.partition_statistics(None)
+    }
+
     /// Return a copy of this DataSource with a new fetch limit
     fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
     fn fetch(&self) -> Option<usize>;
@@ -285,21 +299,7 @@ impl ExecutionPlan for DataSourceExec {
     }
 
     fn partition_statistics(&self, partition: Option<usize>) -> 
Result<Statistics> {
-        if let Some(partition) = partition {
-            let mut statistics = Statistics::new_unknown(&self.schema());
-            if let Some(file_config) =
-                self.data_source.as_any().downcast_ref::<FileScanConfig>()
-            {
-                if let Some(file_group) = 
file_config.file_groups.get(partition) {
-                    if let Some(stat) = file_group.file_statistics(None) {
-                        statistics = stat.clone();
-                    }
-                }
-            }
-            Ok(statistics)
-        } else {
-            Ok(self.data_source.statistics()?)
-        }
+        self.data_source.partition_statistics(partition)
     }
 
     fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn 
ExecutionPlan>> {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to