This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new cc670e8e88 Better document the relationship between
`FileFormat::projection` / `FileFormat::filter` and
`FileScanConfig::Statistics` (#20188)
cc670e8e88 is described below
commit cc670e8e88af32eefb2a5500c9bd282fc35eada5
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Feb 9 08:57:01 2026 -0500
Better document the relationship between `FileFormat::projection` /
`FileFormat::filter` and `FileScanConfig::Statistics` (#20188)
## Which issue does this PR close?
- Part of https://github.com/apache/datafusion/issues/20173
## Rationale for this change
I am debugging an issue related to the interplay of pre-existing
orderings, filtering and projections in FileScanConfig. As part of that
I am trying to understand how `Statistics` were handled by
`FileScanConfig` -- specifically when relatively speaking are the
projection and filtering applied
After some study, I have found that the statistics are (supposed?) to be
*before* applying the Filter and Projection from the scan, so let's
document that better. Also I found the schemas involved to be a bit
confusing.
I also would like to use this PR to validate my understanding of the
intended design
## What changes are included in this PR?
Update documentation
## Are these changes tested?
by CI
## Are there any user-facing changes?
Just documentation changes, no functional changes
---
datafusion/datasource/src/file.rs | 41 ++++++++++++++++--
datafusion/datasource/src/file_scan_config.rs | 59 ++++++++++++++++++++------
datafusion/datasource/src/source.rs | 6 +++
datafusion/physical-plan/src/execution_plan.rs | 1 +
4 files changed, 90 insertions(+), 17 deletions(-)
diff --git a/datafusion/datasource/src/file.rs
b/datafusion/datasource/src/file.rs
index c6282c3c7c..b41a456f1f 100644
--- a/datafusion/datasource/src/file.rs
+++ b/datafusion/datasource/src/file.rs
@@ -46,6 +46,12 @@ pub fn as_file_source<T: FileSource + 'static>(source: T) ->
Arc<dyn FileSource>
/// file format specific behaviors for elements in [`DataSource`]
///
+/// # Schema information
+/// There are two important schemas for a [`FileSource`]:
+/// 1. [`Self::table_schema`] -- the schema for the overall "table"
+/// 2. The logical output schema, comprised of [`Self::table_schema`] with
+/// [`Self::projection`] applied
+///
/// See more details on specific implementations:
/// *
[`ArrowSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowSource.html)
/// *
[`AvroSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.AvroSource.html)
@@ -64,24 +70,38 @@ pub trait FileSource: Send + Sync {
) -> Result<Arc<dyn FileOpener>>;
/// Any
fn as_any(&self) -> &dyn Any;
+
/// Returns the table schema for this file source.
///
- /// This always returns the unprojected schema (the full schema of the
data).
+ /// This always returns the unprojected schema (the full schema of the
data)
+ /// without [`Self::projection`] applied.
+ ///
+ /// The output schema of this `FileSource` is this TableSchema
+ /// with [`Self::projection`] applied.
fn table_schema(&self) -> &crate::table_schema::TableSchema;
+
/// Initialize new type with batch size configuration
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
- /// Returns the filter expression that will be applied during the file
scan.
+
+ /// Returns the filter expression that will be applied *during* the file
scan.
+ ///
+ /// These expressions are in terms of the unprojected
[`Self::table_schema`].
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
None
}
- /// Return the projection that will be applied to the output stream on top
of the table schema.
+
+ /// Return the projection that will be applied to the output stream on top
+ /// of [`Self::table_schema`].
fn projection(&self) -> Option<&ProjectionExprs> {
None
}
+
/// Return execution plan metrics
fn metrics(&self) -> &ExecutionPlanMetricsSet;
+
/// String representation of file source such as "csv", "json", "parquet"
fn file_type(&self) -> &str;
+
/// Format FileType specific information
fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) ->
fmt::Result {
Ok(())
@@ -135,6 +155,19 @@ pub trait FileSource: Send + Sync {
}
/// Try to push down filters into this FileSource.
+ ///
+ /// `filters` must be in terms of the unprojected table schema (file schema
+ /// plus partition columns), before any projection is applied.
+ ///
+ /// Any filters that this FileSource chooses to evaluate itself should be
+ /// returned as `PushedDown::Yes` in the result, along with a FileSource
+ /// instance that incorporates those filters. Such filters are logically
+ /// applied "during" the file scan, meaning they may refer to columns not
+ /// included in the final output projection.
+ ///
+ /// Filters that cannot be pushed down should be marked as
`PushedDown::No`,
+ /// and will be evaluated by an execution plan after the file source.
+ ///
/// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
///
/// [`ExecutionPlan::handle_child_pushdown_result`]:
datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
@@ -220,7 +253,7 @@ pub trait FileSource: Send + Sync {
Ok(SortOrderPushdownResult::Unsupported)
}
- /// Try to push down a projection into a this FileSource.
+ /// Try to push down a projection into this FileSource.
///
/// `FileSource` implementations that support projection pushdown should
/// override this method and return a new `FileSource` instance with the
diff --git a/datafusion/datasource/src/file_scan_config.rs
b/datafusion/datasource/src/file_scan_config.rs
index fe78c0e526..80722755e6 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -55,10 +55,21 @@ use datafusion_physical_plan::{
use log::{debug, warn};
use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult,
sync::Arc};
-/// The base configurations for a [`DataSourceExec`], the a physical plan for
-/// any given file format.
+/// [`FileScanConfig`] represents scanning data from a group of files
///
-/// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`]
from a ``FileScanConfig`.
+/// `FileScanConfig` is used to create a [`DataSourceExec`], the physical plan
+/// for scanning files with a particular file format.
+///
+/// The [`FileSource`] (e.g. `ParquetSource`, `CsvSource`, etc.) is responsible
+/// for creating the actual execution plan to read the files based on a
+/// `FileScanConfig`. Fields in a `FileScanConfig` such as Statistics represent
+/// information about the files **before** any projection or filtering is
+/// applied in the file source.
+///
+/// Use [`FileScanConfigBuilder`] to construct a `FileScanConfig`.
+///
+/// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`]
from
+/// a `FileScanConfig`.
///
/// # Example
/// ```
@@ -169,8 +180,11 @@ pub struct FileScanConfig {
/// Expression adapter used to adapt filters and projections that are
pushed down into the scan
/// from the logical schema to the physical schema of the file.
pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
- /// Unprojected statistics for the table (file schema + partition columns).
- /// These are projected on-demand via `projected_stats()`.
+ /// Statistics for the entire table (file schema + partition columns).
+ /// See [`FileScanConfigBuilder::with_statistics`] for more details.
+ ///
+ /// The effective statistics are computed on-demand via
+ /// [`ProjectionExprs::project_statistics`].
///
/// Note that this field is pub(crate) because accessing it directly from
outside
/// would be incorrect if there are filters being applied, thus this
should be accessed
@@ -283,17 +297,20 @@ impl FileScanConfigBuilder {
}
}
- /// Set the maximum number of records to read from this plan. If `None`,
- /// all records after filtering are returned.
+ /// Set the maximum number of records to read from this plan.
+ ///
+ /// If `None`, all records after filtering are returned.
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}
/// Set whether the limit should be order-sensitive.
+ ///
/// When `true`, files must be read in the exact order specified to produce
/// correct results (e.g., for `ORDER BY ... LIMIT` queries). When `false`,
- /// DataFusion may reorder file processing for optimization without
affecting correctness.
+ /// DataFusion may reorder file processing for optimization without
+ /// affecting correctness.
pub fn with_preserve_order(mut self, order_sensitive: bool) -> Self {
self.preserve_order = order_sensitive;
self
@@ -301,13 +318,14 @@ impl FileScanConfigBuilder {
/// Set the file source for scanning files.
///
- /// This method allows you to change the file source implementation (e.g.
ParquetSource, CsvSource, etc.)
- /// after the builder has been created.
+ /// This method allows you to change the file source implementation (e.g.
+ /// ParquetSource, CsvSource, etc.) after the builder has been created.
pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
self.file_source = file_source;
self
}
+ /// Return the table schema
pub fn table_schema(&self) -> &SchemaRef {
self.file_source.table_schema().table_schema()
}
@@ -332,7 +350,12 @@ impl FileScanConfigBuilder {
/// Set the columns on which to project the data using column indices.
///
- /// Indexes that are higher than the number of columns of `file_schema`
refer to `table_partition_cols`.
+ /// This method attempts to push down the projection to the underlying file
+ /// source if supported. If the file source does not support projection
+ /// pushdown, an error is returned.
+ ///
+ /// Indexes that are higher than the number of columns of `file_schema`
+ /// refer to `table_partition_cols`.
pub fn with_projection_indices(
mut self,
indices: Option<Vec<usize>>,
@@ -371,8 +394,18 @@ impl FileScanConfigBuilder {
self
}
- /// Set the estimated overall statistics of the files, taking `filters`
into account.
- /// Defaults to [`Statistics::new_unknown`].
+ /// Set the statistics of the files, including partition
+ /// columns. Defaults to [`Statistics::new_unknown`].
+ ///
+ /// These statistics are for the entire table (file schema + partition
+ /// columns) before any projection or filtering is applied. Projections are
+ /// applied when statistics are retrieved, and if a filter is present,
+ /// [`FileScanConfig::statistics`] will mark the statistics as inexact
+ /// (counts are not adjusted).
+ ///
+ /// Projections and filters may be applied by the file source, either by
+ /// [`Self::with_projection_indices`] or a preexisting
+ /// [`FileSource::projection`] or [`FileSource::filter`].
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
self.statistics = Some(statistics);
self
diff --git a/datafusion/datasource/src/source.rs
b/datafusion/datasource/src/source.rs
index de18b6be22..71ddac84a8 100644
--- a/datafusion/datasource/src/source.rs
+++ b/datafusion/datasource/src/source.rs
@@ -178,7 +178,13 @@ pub trait DataSource: Send + Sync + Debug {
&self,
_projection: &ProjectionExprs,
) -> Result<Option<Arc<dyn DataSource>>>;
+
/// Try to push down filters into this DataSource.
+ ///
+ /// These filters are in terms of the output schema of this DataSource
(e.g.
+ /// [`Self::eq_properties`] and output of any projections pushed into the
+ /// source), not the original table schema.
+ ///
/// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
///
/// [`ExecutionPlan::handle_child_pushdown_result`]:
datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
diff --git a/datafusion/physical-plan/src/execution_plan.rs
b/datafusion/physical-plan/src/execution_plan.rs
index 52f4829127..43cce0e5ea 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -577,6 +577,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
}
/// Handle the result of a child pushdown.
+ ///
/// This method is called as we recurse back up the plan tree after pushing
/// filters down to child nodes via
[`ExecutionPlan::gather_filters_for_pushdown`].
/// It allows the current node to process the results of filter pushdown
from
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]