This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8f396b8881 Push partition_statistics into DataSource (#18233)
8f396b8881 is described below
commit 8f396b888112bb90725343d250184f20afbacaba
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Sun Oct 26 19:11:17 2025 -0500
Push partition_statistics into DataSource (#18233)
Removes a downcast match in favor of use of the trait. This mirrors the
changes to DataSourceExec to use partition_statistics instead of
statistics from https://github.com/apache/datafusion/pull/15852
---
datafusion/datasource/src/file_scan_config.rs | 124 +++++++++++++++++++++++++-
datafusion/datasource/src/memory.rs | 28 ++++--
datafusion/datasource/src/source.rs | 32 +++----
3 files changed, 159 insertions(+), 25 deletions(-)
diff --git a/datafusion/datasource/src/file_scan_config.rs
b/datafusion/datasource/src/file_scan_config.rs
index 4dfb6a4ec3..695252803b 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -598,8 +598,39 @@ impl DataSource for FileScanConfig {
SchedulingType::Cooperative
}
- fn statistics(&self) -> Result<Statistics> {
- Ok(self.projected_stats())
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ if let Some(partition) = partition {
+ // Get statistics for a specific partition
+ if let Some(file_group) = self.file_groups.get(partition) {
+ if let Some(stat) = file_group.file_statistics(None) {
+ // Project the statistics based on the projection
+ let table_cols_stats = self
+ .projection_indices()
+ .into_iter()
+ .map(|idx| {
+ if idx < self.file_schema().fields().len() {
+ stat.column_statistics[idx].clone()
+ } else {
+ // TODO provide accurate stat for partition
column
+ // See
https://github.com/apache/datafusion/issues/1186
+ ColumnStatistics::new_unknown()
+ }
+ })
+ .collect();
+
+ return Ok(Statistics {
+ num_rows: stat.num_rows,
+ total_byte_size: stat.total_byte_size,
+ column_statistics: table_cols_stats,
+ });
+ }
+ }
+ // If no statistics available for this partition, return unknown
+ Ok(Statistics::new_unknown(&self.projected_schema()))
+ } else {
+ // Return aggregate statistics across all partitions
+ Ok(self.projected_stats())
+ }
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
@@ -1603,7 +1634,7 @@ mod tests {
);
let source_statistics = conf.file_source.statistics().unwrap();
- let conf_stats = conf.statistics().unwrap();
+ let conf_stats = conf.partition_statistics(None).unwrap();
// projection should be reflected in the file source statistics
assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
@@ -2510,4 +2541,91 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_partition_statistics_projection() {
+ // This test verifies that partition_statistics applies projection
correctly.
+ // The old implementation had a bug where it returned file group
statistics
+ // without applying the projection, returning all column statistics
instead
+ // of just the projected ones.
+
+ use crate::source::DataSourceExec;
+ use datafusion_physical_plan::ExecutionPlan;
+
+ // Create a schema with 4 columns
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("col0", DataType::Int32, false),
+ Field::new("col1", DataType::Int32, false),
+ Field::new("col2", DataType::Int32, false),
+ Field::new("col3", DataType::Int32, false),
+ ]));
+
+ // Create statistics for all 4 columns
+ let file_group_stats = Statistics {
+ num_rows: Precision::Exact(100),
+ total_byte_size: Precision::Exact(1024),
+ column_statistics: vec![
+ ColumnStatistics {
+ null_count: Precision::Exact(0),
+ ..ColumnStatistics::new_unknown()
+ },
+ ColumnStatistics {
+ null_count: Precision::Exact(5),
+ ..ColumnStatistics::new_unknown()
+ },
+ ColumnStatistics {
+ null_count: Precision::Exact(10),
+ ..ColumnStatistics::new_unknown()
+ },
+ ColumnStatistics {
+ null_count: Precision::Exact(15),
+ ..ColumnStatistics::new_unknown()
+ },
+ ],
+ };
+
+ // Create a file group with statistics
+ let file_group =
FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)])
+ .with_statistics(Arc::new(file_group_stats));
+
+ // Create a FileScanConfig with projection: only keep columns 0 and 2
+ let config = FileScanConfigBuilder::new(
+ ObjectStoreUrl::parse("test:///").unwrap(),
+ Arc::clone(&schema),
+ Arc::new(MockSource::default()),
+ )
+ .with_projection(Some(vec![0, 2])) // Only project columns 0 and 2
+ .with_file_groups(vec![file_group])
+ .build();
+
+ // Create a DataSourceExec from the config
+ let exec = DataSourceExec::from_data_source(config);
+
+ // Get statistics for partition 0
+ let partition_stats = exec.partition_statistics(Some(0)).unwrap();
+
+ // Verify that only 2 columns are in the statistics (the projected
ones)
+ assert_eq!(
+ partition_stats.column_statistics.len(),
+ 2,
+ "Expected 2 column statistics (projected), but got {}",
+ partition_stats.column_statistics.len()
+ );
+
+ // Verify the column statistics are for columns 0 and 2
+ assert_eq!(
+ partition_stats.column_statistics[0].null_count,
+ Precision::Exact(0),
+ "First projected column should be col0 with 0 nulls"
+ );
+ assert_eq!(
+ partition_stats.column_statistics[1].null_count,
+ Precision::Exact(10),
+ "Second projected column should be col2 with 10 nulls"
+ );
+
+ // Verify row count and byte size are preserved
+ assert_eq!(partition_stats.num_rows, Precision::Exact(100));
+ assert_eq!(partition_stats.total_byte_size, Precision::Exact(1024));
+ }
}
diff --git a/datafusion/datasource/src/memory.rs
b/datafusion/datasource/src/memory.rs
index eb55aa9b0b..7d5c8c4834 100644
--- a/datafusion/datasource/src/memory.rs
+++ b/datafusion/datasource/src/memory.rs
@@ -21,6 +21,7 @@ use std::collections::BinaryHeap;
use std::fmt;
use std::fmt::Debug;
use std::ops::Deref;
+use std::slice::from_ref;
use std::sync::Arc;
use crate::sink::DataSink;
@@ -192,12 +193,27 @@ impl DataSource for MemorySourceConfig {
SchedulingType::Cooperative
}
- fn statistics(&self) -> Result<Statistics> {
- Ok(common::compute_record_batch_statistics(
- &self.partitions,
- &self.schema,
- self.projection.clone(),
- ))
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
+ if let Some(partition) = partition {
+ // Compute statistics for a specific partition
+ if let Some(batches) = self.partitions.get(partition) {
+ Ok(common::compute_record_batch_statistics(
+ from_ref(batches),
+ &self.schema,
+ self.projection.clone(),
+ ))
+ } else {
+ // Invalid partition index
+ Ok(Statistics::new_unknown(&self.projected_schema))
+ }
+ } else {
+ // Compute statistics across all partitions
+ Ok(common::compute_record_batch_statistics(
+ &self.partitions,
+ &self.schema,
+ self.projection.clone(),
+ ))
+ }
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
diff --git a/datafusion/datasource/src/source.rs
b/datafusion/datasource/src/source.rs
index 20d9a1d6e5..11a8a3867b 100644
--- a/datafusion/datasource/src/source.rs
+++ b/datafusion/datasource/src/source.rs
@@ -151,7 +151,21 @@ pub trait DataSource: Send + Sync + Debug {
fn scheduling_type(&self) -> SchedulingType {
SchedulingType::NonCooperative
}
- fn statistics(&self) -> Result<Statistics>;
+
+ /// Returns statistics for a specific partition, or aggregate statistics
+ /// across all partitions if `partition` is `None`.
+ fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics>;
+
+ /// Returns aggregate statistics across all partitions.
+ ///
+ /// # Deprecated
+ /// Use [`Self::partition_statistics`] instead, which provides more
fine-grained
+ /// control over statistics retrieval (per-partition or aggregate).
+ #[deprecated(since = "51.0.0", note = "Use partition_statistics instead")]
+ fn statistics(&self) -> Result<Statistics> {
+ self.partition_statistics(None)
+ }
+
/// Return a copy of this DataSource with a new fetch limit
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
fn fetch(&self) -> Option<usize>;
@@ -285,21 +299,7 @@ impl ExecutionPlan for DataSourceExec {
}
fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- if let Some(partition) = partition {
- let mut statistics = Statistics::new_unknown(&self.schema());
- if let Some(file_config) =
- self.data_source.as_any().downcast_ref::<FileScanConfig>()
- {
- if let Some(file_group) =
file_config.file_groups.get(partition) {
- if let Some(stat) = file_group.file_statistics(None) {
- statistics = stat.clone();
- }
- }
- }
- Ok(statistics)
- } else {
- Ok(self.data_source.statistics()?)
- }
+ self.data_source.partition_statistics(partition)
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn
ExecutionPlan>> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]