This is an automated email from the ASF dual-hosted git repository. adriangb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 7b45934b10 Add `TableProvider::scan_with_args` (#17336) 7b45934b10 is described below commit 7b45934b10ada9f1881d1a38068d55ebb3b34e05 Author: Adrian Garcia Badaracco <1755071+adria...@users.noreply.github.com> AuthorDate: Mon Sep 15 14:57:19 2025 -0400 Add `TableProvider::scan_with_args` (#17336) --- datafusion/catalog/src/table.rs | 139 ++++++++++++++++++++++++ datafusion/core/src/datasource/listing/table.rs | 36 ++++-- datafusion/core/src/physical_planner.rs | 11 +- 3 files changed, 176 insertions(+), 10 deletions(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index ac2e1884ba..11c9af01a7 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -171,6 +171,37 @@ pub trait TableProvider: Debug + Sync + Send { limit: Option<usize>, ) -> Result<Arc<dyn ExecutionPlan>>; + /// Create an [`ExecutionPlan`] for scanning the table using structured arguments. + /// + /// This method uses [`ScanArgs`] to pass scan parameters in a structured way + /// and returns a [`ScanResult`] containing the execution plan. + /// + /// Table providers can override this method to take advantage of additional + /// parameters like the upcoming `preferred_ordering` that may not be available through + /// other scan methods. + /// + /// # Arguments + /// * `state` - The session state containing configuration and context + /// * `args` - Structured scan arguments including projection, filters, limit, and ordering preferences + /// + /// # Returns + /// A [`ScanResult`] containing the [`ExecutionPlan`] for scanning the table + /// + /// See [`Self::scan`] for detailed documentation about projection, filters, and limits. + async fn scan_with_args<'a>( + &self, + state: &dyn Session, + args: ScanArgs<'a>, + ) -> Result<ScanResult> { + let filters = args.filters().unwrap_or(&[]); + let projection = args.projection().map(|p| p.to_vec()); + let limit = args.limit(); + let plan = self + .scan(state, projection.as_ref(), filters, limit) + .await?; + Ok(plan.into()) + } + /// Specify if DataFusion should provide filter expressions to the /// TableProvider to apply *during* the scan. /// @@ -299,6 +330,114 @@ pub trait TableProvider: Debug + Sync + Send { } } +/// Arguments for scanning a table with [`TableProvider::scan_with_args`]. +#[derive(Debug, Clone, Default)] +pub struct ScanArgs<'a> { + filters: Option<&'a [Expr]>, + projection: Option<&'a [usize]>, + limit: Option<usize>, +} + +impl<'a> ScanArgs<'a> { + /// Set the column projection for the scan. + /// + /// The projection is a list of column indices from [`TableProvider::schema`] + /// that should be included in the scan results. If `None`, all columns are included. + /// + /// # Arguments + /// * `projection` - Optional slice of column indices to project + pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self { + self.projection = projection; + self + } + + /// Get the column projection for the scan. + /// + /// Returns a reference to the projection column indices, or `None` if + /// no projection was specified (meaning all columns should be included). + pub fn projection(&self) -> Option<&'a [usize]> { + self.projection + } + + /// Set the filter expressions for the scan. + /// + /// Filters are boolean expressions that should be evaluated during the scan + /// to reduce the number of rows returned. All expressions are combined with AND logic. + /// Whether filters are actually pushed down depends on [`TableProvider::supports_filters_pushdown`]. + /// + /// # Arguments + /// * `filters` - Optional slice of filter expressions + pub fn with_filters(mut self, filters: Option<&'a [Expr]>) -> Self { + self.filters = filters; + self + } + + /// Get the filter expressions for the scan. + /// + /// Returns a reference to the filter expressions, or `None` if no filters were specified. + pub fn filters(&self) -> Option<&'a [Expr]> { + self.filters + } + + /// Set the maximum number of rows to return from the scan. + /// + /// If specified, the scan should return at most this many rows. This is typically + /// used to optimize queries with `LIMIT` clauses. + /// + /// # Arguments + /// * `limit` - Optional maximum number of rows to return + pub fn with_limit(mut self, limit: Option<usize>) -> Self { + self.limit = limit; + self + } + + /// Get the maximum number of rows to return from the scan. + /// + /// Returns the row limit, or `None` if no limit was specified. + pub fn limit(&self) -> Option<usize> { + self.limit + } +} + +/// Result of a table scan operation from [`TableProvider::scan_with_args`]. +#[derive(Debug, Clone)] +pub struct ScanResult { + /// The ExecutionPlan to run. + plan: Arc<dyn ExecutionPlan>, +} + +impl ScanResult { + /// Create a new `ScanResult` with the given execution plan. + /// + /// # Arguments + /// * `plan` - The execution plan that will perform the table scan + pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self { + Self { plan } + } + + /// Get a reference to the execution plan for this scan result. + /// + /// Returns a reference to the [`ExecutionPlan`] that will perform + /// the actual table scanning and data retrieval. + pub fn plan(&self) -> &Arc<dyn ExecutionPlan> { + &self.plan + } + + /// Consume this ScanResult and return the execution plan. + /// + /// Returns the owned [`ExecutionPlan`] that will perform + /// the actual table scanning and data retrieval. + pub fn into_inner(self) -> Arc<dyn ExecutionPlan> { + self.plan + } +} + +impl From<Arc<dyn ExecutionPlan>> for ScanResult { + fn from(plan: Arc<dyn ExecutionPlan>) -> Self { + Self::new(plan) + } +} + /// A factory which creates [`TableProvider`]s at runtime given a URL. /// /// For example, this can be used to create a table "on the fly" diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 9858b10912..18d84c4ba0 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -29,7 +29,7 @@ use crate::{ use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; use arrow_schema::Schema; use async_trait::async_trait; -use datafusion_catalog::{Session, TableProvider}; +use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider}; use datafusion_common::{ config_datafusion_err, config_err, internal_err, plan_err, project_schema, stats::Precision, Constraints, DataFusionError, Result, SchemaExt, @@ -1169,6 +1169,22 @@ impl TableProvider for ListingTable { filters: &[Expr], limit: Option<usize>, ) -> Result<Arc<dyn ExecutionPlan>> { + let options = ScanArgs::default() + .with_projection(projection.map(|p| p.as_slice())) + .with_filters(Some(filters)) + .with_limit(limit); + Ok(self.scan_with_args(state, options).await?.into_inner()) + } + + async fn scan_with_args<'a>( + &self, + state: &dyn Session, + args: ScanArgs<'a>, + ) -> Result<ScanResult> { + let projection = args.projection().map(|p| p.to_vec()); + let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default(); + let limit = args.limit(); + // extract types of partition columns let table_partition_cols = self .options @@ -1181,6 +1197,7 @@ impl TableProvider for ListingTable { .iter() .map(|field| field.name().as_str()) .collect::<Vec<_>>(); + // If the filters can be resolved using only partition cols, there is no need to // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated let (partition_filters, filters): (Vec<_>, Vec<_>) = @@ -1198,8 +1215,8 @@ impl TableProvider for ListingTable { // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { - let projected_schema = project_schema(&self.schema(), projection)?; - return Ok(Arc::new(EmptyExec::new(projected_schema))); + let projected_schema = project_schema(&self.schema(), projection.as_ref())?; + return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema)))); } let output_ordering = self.try_create_output_ordering()?; @@ -1233,13 +1250,16 @@ impl TableProvider for ListingTable { let Some(object_store_url) = self.table_paths.first().map(ListingTableUrl::object_store) else { - return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))); + return Ok(ScanResult::new(Arc::new(EmptyExec::new(Arc::new( + Schema::empty(), + ))))); }; let file_source = self.create_file_source_with_schema_adapter()?; // create the execution plan - self.options + let plan = self + .options .format .create_physical_plan( state, @@ -1251,14 +1271,16 @@ impl TableProvider for ListingTable { .with_file_groups(partitioned_file_lists) .with_constraints(self.constraints.clone()) .with_statistics(statistics) - .with_projection(projection.cloned()) + .with_projection(projection) .with_limit(limit) .with_output_ordering(output_ordering) .with_table_partition_cols(table_partition_cols) .with_expr_adapter(self.expr_adapter_factory.clone()) .build(), ) - .await + .await?; + + Ok(ScanResult::new(plan)) } fn supports_filters_pushdown( diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 6618d9495d..d7f30609a4 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -60,6 +60,7 @@ use crate::schema_equivalence::schema_satisfied_by; use arrow::array::{builder::StringBuilder, RecordBatch}; use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; +use datafusion_catalog::ScanArgs; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor, @@ -459,9 +460,13 @@ impl DefaultPhysicalPlanner { // doesn't know (nor should care) how the relation was // referred to in the query let filters = unnormalize_cols(filters.iter().cloned()); - source - .scan(session_state, projection.as_ref(), &filters, *fetch) - .await? + let filters_vec = filters.into_iter().collect::<Vec<_>>(); + let opts = ScanArgs::default() + .with_projection(projection.as_deref()) + .with_filters(Some(&filters_vec)) + .with_limit(*fetch); + let res = source.scan_with_args(session_state, opts).await?; + Arc::clone(res.plan()) } LogicalPlan::Values(Values { values, schema }) => { let exec_schema = schema.as_ref().to_owned().into(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org