This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new cc670e8e88 Better document the relationship between 
`FileFormat::projection` / `FileFormat::filter` and 
`FileScanConfig::Statistics`  (#20188)
cc670e8e88 is described below

commit cc670e8e88af32eefb2a5500c9bd282fc35eada5
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Feb 9 08:57:01 2026 -0500

    Better document the relationship between `FileFormat::projection` / 
`FileFormat::filter` and `FileScanConfig::Statistics`  (#20188)
    
    ## Which issue does this PR close?
    
    - Part of https://github.com/apache/datafusion/issues/20173
    ## Rationale for this change
    
    I am debugging an issue related to the interplay of pre-existing
    orderings, filtering and projections in FileScanConfig. As part of that
    I am trying to understand how `Statistics` were handled by
    `FileScanConfig` -- specifically when relatively speaking are the
    projection and filtering applied
    
    After some study, I have found that the statistics are (supposed?) to be
    *before* applying the Filter and Projection from the scan, so let's
    document that better. Also I found the schemas involved to be a bit
    confusing.
    
    I also would like to use this PR to validate my understanding of the
    intended design
    
    ## What changes are included in this PR?
    
    Update documentation
    ## Are these changes tested?
    
    by CI
    
    ## Are there any user-facing changes?
    
    Just documentation changes, no functional changes
---
 datafusion/datasource/src/file.rs              | 41 ++++++++++++++++--
 datafusion/datasource/src/file_scan_config.rs  | 59 ++++++++++++++++++++------
 datafusion/datasource/src/source.rs            |  6 +++
 datafusion/physical-plan/src/execution_plan.rs |  1 +
 4 files changed, 90 insertions(+), 17 deletions(-)

diff --git a/datafusion/datasource/src/file.rs 
b/datafusion/datasource/src/file.rs
index c6282c3c7c..b41a456f1f 100644
--- a/datafusion/datasource/src/file.rs
+++ b/datafusion/datasource/src/file.rs
@@ -46,6 +46,12 @@ pub fn as_file_source<T: FileSource + 'static>(source: T) -> 
Arc<dyn FileSource>
 
 /// file format specific behaviors for elements in [`DataSource`]
 ///
+/// # Schema information
+/// There are two important schemas for a [`FileSource`]:
+/// 1. [`Self::table_schema`] -- the schema for the overall "table"
+/// 2. The logical output schema, comprised of [`Self::table_schema`] with
+///    [`Self::projection`] applied
+///
 /// See more details on specific implementations:
 /// * 
[`ArrowSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowSource.html)
 /// * 
[`AvroSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.AvroSource.html)
@@ -64,24 +70,38 @@ pub trait FileSource: Send + Sync {
     ) -> Result<Arc<dyn FileOpener>>;
     /// Any
     fn as_any(&self) -> &dyn Any;
+
     /// Returns the table schema for this file source.
     ///
-    /// This always returns the unprojected schema (the full schema of the 
data).
+    /// This always returns the unprojected schema (the full schema of the 
data)
+    /// without [`Self::projection`] applied.
+    ///
+    /// The output schema of this `FileSource` is this TableSchema
+    /// with [`Self::projection`] applied.
     fn table_schema(&self) -> &crate::table_schema::TableSchema;
+
     /// Initialize new type with batch size configuration
     fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
-    /// Returns the filter expression that will be applied during the file 
scan.
+
+    /// Returns the filter expression that will be applied *during* the file 
scan.
+    ///
+    /// These expressions are in terms of the unprojected 
[`Self::table_schema`].
     fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
         None
     }
-    /// Return the projection that will be applied to the output stream on top 
of the table schema.
+
+    /// Return the projection that will be applied to the output stream on top
+    /// of [`Self::table_schema`].
     fn projection(&self) -> Option<&ProjectionExprs> {
         None
     }
+
     /// Return execution plan metrics
     fn metrics(&self) -> &ExecutionPlanMetricsSet;
+
     /// String representation of file source such as "csv", "json", "parquet"
     fn file_type(&self) -> &str;
+
     /// Format FileType specific information
     fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> 
fmt::Result {
         Ok(())
@@ -135,6 +155,19 @@ pub trait FileSource: Send + Sync {
     }
 
     /// Try to push down filters into this FileSource.
+    ///
+    /// `filters` must be in terms of the unprojected table schema (file schema
+    /// plus partition columns), before any projection is applied.
+    ///
+    /// Any filters that this FileSource chooses to evaluate itself should be
+    /// returned as `PushedDown::Yes` in the result, along with a FileSource
+    /// instance that incorporates those filters. Such filters are logically
+    /// applied "during" the file scan, meaning they may refer to columns not
+    /// included in the final output projection.
+    ///
+    /// Filters that cannot be pushed down should be marked as 
`PushedDown::No`,
+    /// and will be evaluated by an execution plan after the file source.
+    ///
     /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
     ///
     /// [`ExecutionPlan::handle_child_pushdown_result`]: 
datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
@@ -220,7 +253,7 @@ pub trait FileSource: Send + Sync {
         Ok(SortOrderPushdownResult::Unsupported)
     }
 
-    /// Try to push down a projection into a this FileSource.
+    /// Try to push down a projection into this FileSource.
     ///
     /// `FileSource` implementations that support projection pushdown should
     /// override this method and return a new `FileSource` instance with the
diff --git a/datafusion/datasource/src/file_scan_config.rs 
b/datafusion/datasource/src/file_scan_config.rs
index fe78c0e526..80722755e6 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -55,10 +55,21 @@ use datafusion_physical_plan::{
 use log::{debug, warn};
 use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, 
sync::Arc};
 
-/// The base configurations for a [`DataSourceExec`], the a physical plan for
-/// any given file format.
+/// [`FileScanConfig`] represents scanning data from a group of files
 ///
-/// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`] 
from a ``FileScanConfig`.
+/// `FileScanConfig` is used to create a [`DataSourceExec`], the physical plan
+/// for scanning files with a particular file format.
+///
+/// The [`FileSource`] (e.g. `ParquetSource`, `CsvSource`, etc.) is responsible
+/// for creating the actual execution plan to read the files based on a
+/// `FileScanConfig`. Fields in a `FileScanConfig` such as Statistics represent
+/// information about the files **before** any projection or filtering is
+/// applied in the file source.
+///
+/// Use [`FileScanConfigBuilder`] to construct a `FileScanConfig`.
+///
+/// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`] 
from
+/// a `FileScanConfig`.
 ///
 /// # Example
 /// ```
@@ -169,8 +180,11 @@ pub struct FileScanConfig {
     /// Expression adapter used to adapt filters and projections that are 
pushed down into the scan
     /// from the logical schema to the physical schema of the file.
     pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
-    /// Unprojected statistics for the table (file schema + partition columns).
-    /// These are projected on-demand via `projected_stats()`.
+    /// Statistics for the entire table (file schema + partition columns).
+    /// See [`FileScanConfigBuilder::with_statistics`] for more details.
+    ///
+    /// The effective statistics are computed on-demand via
+    /// [`ProjectionExprs::project_statistics`].
     ///
     /// Note that this field is pub(crate) because accessing it directly from 
outside
     /// would be incorrect if there are filters being applied, thus this 
should be accessed
@@ -283,17 +297,20 @@ impl FileScanConfigBuilder {
         }
     }
 
-    /// Set the maximum number of records to read from this plan. If `None`,
-    /// all records after filtering are returned.
+    /// Set the maximum number of records to read from this plan.
+    ///
+    /// If `None`, all records after filtering are returned.
     pub fn with_limit(mut self, limit: Option<usize>) -> Self {
         self.limit = limit;
         self
     }
 
     /// Set whether the limit should be order-sensitive.
+    ///
     /// When `true`, files must be read in the exact order specified to produce
     /// correct results (e.g., for `ORDER BY ... LIMIT` queries). When `false`,
-    /// DataFusion may reorder file processing for optimization without 
affecting correctness.
+    /// DataFusion may reorder file processing for optimization without
+    /// affecting correctness.
     pub fn with_preserve_order(mut self, order_sensitive: bool) -> Self {
         self.preserve_order = order_sensitive;
         self
@@ -301,13 +318,14 @@ impl FileScanConfigBuilder {
 
     /// Set the file source for scanning files.
     ///
-    /// This method allows you to change the file source implementation (e.g. 
ParquetSource, CsvSource, etc.)
-    /// after the builder has been created.
+    /// This method allows you to change the file source implementation (e.g.
+    /// ParquetSource, CsvSource, etc.) after the builder has been created.
     pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
         self.file_source = file_source;
         self
     }
 
+    /// Return the table schema
     pub fn table_schema(&self) -> &SchemaRef {
         self.file_source.table_schema().table_schema()
     }
@@ -332,7 +350,12 @@ impl FileScanConfigBuilder {
 
     /// Set the columns on which to project the data using column indices.
     ///
-    /// Indexes that are higher than the number of columns of `file_schema` 
refer to `table_partition_cols`.
+    /// This method attempts to push down the projection to the underlying file
+    /// source if supported. If the file source does not support projection
+    /// pushdown, an error is returned.
+    ///
+    /// Indexes that are higher than the number of columns of `file_schema`
+    /// refer to `table_partition_cols`.
     pub fn with_projection_indices(
         mut self,
         indices: Option<Vec<usize>>,
@@ -371,8 +394,18 @@ impl FileScanConfigBuilder {
         self
     }
 
-    /// Set the estimated overall statistics of the files, taking `filters` 
into account.
-    /// Defaults to [`Statistics::new_unknown`].
+    /// Set the statistics of the files, including partition
+    /// columns. Defaults to [`Statistics::new_unknown`].
+    ///
+    /// These statistics are for the entire table (file schema + partition
+    /// columns) before any projection or filtering is applied. Projections are
+    /// applied when statistics are retrieved, and if a filter is present,
+    /// [`FileScanConfig::statistics`] will mark the statistics as inexact
+    /// (counts are not adjusted).
+    ///
+    /// Projections and filters may be applied by the file source, either by
+    /// [`Self::with_projection_indices`] or a preexisting
+    /// [`FileSource::projection`] or [`FileSource::filter`].
     pub fn with_statistics(mut self, statistics: Statistics) -> Self {
         self.statistics = Some(statistics);
         self
diff --git a/datafusion/datasource/src/source.rs 
b/datafusion/datasource/src/source.rs
index de18b6be22..71ddac84a8 100644
--- a/datafusion/datasource/src/source.rs
+++ b/datafusion/datasource/src/source.rs
@@ -178,7 +178,13 @@ pub trait DataSource: Send + Sync + Debug {
         &self,
         _projection: &ProjectionExprs,
     ) -> Result<Option<Arc<dyn DataSource>>>;
+
     /// Try to push down filters into this DataSource.
+    ///
+    /// These filters are in terms of the output schema of this DataSource 
(e.g.
+    /// [`Self::eq_properties`] and output of any projections pushed into the
+    /// source), not the original table schema.
+    ///
     /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
     ///
     /// [`ExecutionPlan::handle_child_pushdown_result`]: 
datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
diff --git a/datafusion/physical-plan/src/execution_plan.rs 
b/datafusion/physical-plan/src/execution_plan.rs
index 52f4829127..43cce0e5ea 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -577,6 +577,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     }
 
     /// Handle the result of a child pushdown.
+    ///
     /// This method is called as we recurse back up the plan tree after pushing
     /// filters down to child nodes via 
[`ExecutionPlan::gather_filters_for_pushdown`].
     /// It allows the current node to process the results of filter pushdown 
from


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to