This is an automated email from the ASF dual-hosted git repository.

adriangb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7b45934b10 Add `TableProvider::scan_with_args`  (#17336)
7b45934b10 is described below

commit 7b45934b10ada9f1881d1a38068d55ebb3b34e05
Author: Adrian Garcia Badaracco <1755071+adria...@users.noreply.github.com>
AuthorDate: Mon Sep 15 14:57:19 2025 -0400

    Add `TableProvider::scan_with_args`  (#17336)
---
 datafusion/catalog/src/table.rs                 | 139 ++++++++++++++++++++++++
 datafusion/core/src/datasource/listing/table.rs |  36 ++++--
 datafusion/core/src/physical_planner.rs         |  11 +-
 3 files changed, 176 insertions(+), 10 deletions(-)

diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs
index ac2e1884ba..11c9af01a7 100644
--- a/datafusion/catalog/src/table.rs
+++ b/datafusion/catalog/src/table.rs
@@ -171,6 +171,37 @@ pub trait TableProvider: Debug + Sync + Send {
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>>;
 
+    /// Create an [`ExecutionPlan`] for scanning the table using structured 
arguments.
+    ///
+    /// This method uses [`ScanArgs`] to pass scan parameters in a structured 
way
+    /// and returns a [`ScanResult`] containing the execution plan.
+    ///
+    /// Table providers can override this method to take advantage of 
additional
+    /// parameters like the upcoming `preferred_ordering` that may not be 
available through
+    /// other scan methods.
+    ///
+    /// # Arguments
+    /// * `state` - The session state containing configuration and context
+    /// * `args` - Structured scan arguments including projection, filters, 
limit, and ordering preferences
+    ///
+    /// # Returns
+    /// A [`ScanResult`] containing the [`ExecutionPlan`] for scanning the 
table
+    ///
+    /// See [`Self::scan`] for detailed documentation about projection, 
filters, and limits.
+    async fn scan_with_args<'a>(
+        &self,
+        state: &dyn Session,
+        args: ScanArgs<'a>,
+    ) -> Result<ScanResult> {
+        let filters = args.filters().unwrap_or(&[]);
+        let projection = args.projection().map(|p| p.to_vec());
+        let limit = args.limit();
+        let plan = self
+            .scan(state, projection.as_ref(), filters, limit)
+            .await?;
+        Ok(plan.into())
+    }
+
     /// Specify if DataFusion should provide filter expressions to the
     /// TableProvider to apply *during* the scan.
     ///
@@ -299,6 +330,114 @@ pub trait TableProvider: Debug + Sync + Send {
     }
 }
 
+/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
+#[derive(Debug, Clone, Default)]
+pub struct ScanArgs<'a> {
+    filters: Option<&'a [Expr]>,
+    projection: Option<&'a [usize]>,
+    limit: Option<usize>,
+}
+
+impl<'a> ScanArgs<'a> {
+    /// Set the column projection for the scan.
+    ///
+    /// The projection is a list of column indices from 
[`TableProvider::schema`]
+    /// that should be included in the scan results. If `None`, all columns 
are included.
+    ///
+    /// # Arguments
+    /// * `projection` - Optional slice of column indices to project
+    pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
+        self.projection = projection;
+        self
+    }
+
+    /// Get the column projection for the scan.
+    ///
+    /// Returns a reference to the projection column indices, or `None` if
+    /// no projection was specified (meaning all columns should be included).
+    pub fn projection(&self) -> Option<&'a [usize]> {
+        self.projection
+    }
+
+    /// Set the filter expressions for the scan.
+    ///
+    /// Filters are boolean expressions that should be evaluated during the 
scan
+    /// to reduce the number of rows returned. All expressions are combined 
with AND logic.
+    /// Whether filters are actually pushed down depends on 
[`TableProvider::supports_filters_pushdown`].
+    ///
+    /// # Arguments
+    /// * `filters` - Optional slice of filter expressions
+    pub fn with_filters(mut self, filters: Option<&'a [Expr]>) -> Self {
+        self.filters = filters;
+        self
+    }
+
+    /// Get the filter expressions for the scan.
+    ///
+    /// Returns a reference to the filter expressions, or `None` if no filters 
were specified.
+    pub fn filters(&self) -> Option<&'a [Expr]> {
+        self.filters
+    }
+
+    /// Set the maximum number of rows to return from the scan.
+    ///
+    /// If specified, the scan should return at most this many rows. This is 
typically
+    /// used to optimize queries with `LIMIT` clauses.
+    ///
+    /// # Arguments
+    /// * `limit` - Optional maximum number of rows to return
+    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
+        self.limit = limit;
+        self
+    }
+
+    /// Get the maximum number of rows to return from the scan.
+    ///
+    /// Returns the row limit, or `None` if no limit was specified.
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+}
+
+/// Result of a table scan operation from [`TableProvider::scan_with_args`].
+#[derive(Debug, Clone)]
+pub struct ScanResult {
+    /// The ExecutionPlan to run.
+    plan: Arc<dyn ExecutionPlan>,
+}
+
+impl ScanResult {
+    /// Create a new `ScanResult` with the given execution plan.
+    ///
+    /// # Arguments
+    /// * `plan` - The execution plan that will perform the table scan
+    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+        Self { plan }
+    }
+
+    /// Get a reference to the execution plan for this scan result.
+    ///
+    /// Returns a reference to the [`ExecutionPlan`] that will perform
+    /// the actual table scanning and data retrieval.
+    pub fn plan(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.plan
+    }
+
+    /// Consume this ScanResult and return the execution plan.
+    ///
+    /// Returns the owned [`ExecutionPlan`] that will perform
+    /// the actual table scanning and data retrieval.
+    pub fn into_inner(self) -> Arc<dyn ExecutionPlan> {
+        self.plan
+    }
+}
+
+impl From<Arc<dyn ExecutionPlan>> for ScanResult {
+    fn from(plan: Arc<dyn ExecutionPlan>) -> Self {
+        Self::new(plan)
+    }
+}
+
 /// A factory which creates [`TableProvider`]s at runtime given a URL.
 ///
 /// For example, this can be used to create a table "on the fly"
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 9858b10912..18d84c4ba0 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -29,7 +29,7 @@ use crate::{
 use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
 use arrow_schema::Schema;
 use async_trait::async_trait;
-use datafusion_catalog::{Session, TableProvider};
+use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
 use datafusion_common::{
     config_datafusion_err, config_err, internal_err, plan_err, project_schema,
     stats::Precision, Constraints, DataFusionError, Result, SchemaExt,
@@ -1169,6 +1169,22 @@ impl TableProvider for ListingTable {
         filters: &[Expr],
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
+        let options = ScanArgs::default()
+            .with_projection(projection.map(|p| p.as_slice()))
+            .with_filters(Some(filters))
+            .with_limit(limit);
+        Ok(self.scan_with_args(state, options).await?.into_inner())
+    }
+
+    async fn scan_with_args<'a>(
+        &self,
+        state: &dyn Session,
+        args: ScanArgs<'a>,
+    ) -> Result<ScanResult> {
+        let projection = args.projection().map(|p| p.to_vec());
+        let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default();
+        let limit = args.limit();
+
         // extract types of partition columns
         let table_partition_cols = self
             .options
@@ -1181,6 +1197,7 @@ impl TableProvider for ListingTable {
             .iter()
             .map(|field| field.name().as_str())
             .collect::<Vec<_>>();
+
         // If the filters can be resolved using only partition cols, there is 
no need to
         // pushdown it to TableScan, otherwise, `unhandled` pruning predicates 
will be generated
         let (partition_filters, filters): (Vec<_>, Vec<_>) =
@@ -1198,8 +1215,8 @@ impl TableProvider for ListingTable {
 
         // if no files need to be read, return an `EmptyExec`
         if partitioned_file_lists.is_empty() {
-            let projected_schema = project_schema(&self.schema(), projection)?;
-            return Ok(Arc::new(EmptyExec::new(projected_schema)));
+            let projected_schema = project_schema(&self.schema(), 
projection.as_ref())?;
+            return 
Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
         }
 
         let output_ordering = self.try_create_output_ordering()?;
@@ -1233,13 +1250,16 @@ impl TableProvider for ListingTable {
         let Some(object_store_url) =
             self.table_paths.first().map(ListingTableUrl::object_store)
         else {
-            return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
+            return Ok(ScanResult::new(Arc::new(EmptyExec::new(Arc::new(
+                Schema::empty(),
+            )))));
         };
 
         let file_source = self.create_file_source_with_schema_adapter()?;
 
         // create the execution plan
-        self.options
+        let plan = self
+            .options
             .format
             .create_physical_plan(
                 state,
@@ -1251,14 +1271,16 @@ impl TableProvider for ListingTable {
                 .with_file_groups(partitioned_file_lists)
                 .with_constraints(self.constraints.clone())
                 .with_statistics(statistics)
-                .with_projection(projection.cloned())
+                .with_projection(projection)
                 .with_limit(limit)
                 .with_output_ordering(output_ordering)
                 .with_table_partition_cols(table_partition_cols)
                 .with_expr_adapter(self.expr_adapter_factory.clone())
                 .build(),
             )
-            .await
+            .await?;
+
+        Ok(ScanResult::new(plan))
     }
 
     fn supports_filters_pushdown(
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 6618d9495d..d7f30609a4 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -60,6 +60,7 @@ use crate::schema_equivalence::schema_satisfied_by;
 use arrow::array::{builder::StringBuilder, RecordBatch};
 use arrow::compute::SortOptions;
 use arrow::datatypes::{Schema, SchemaRef};
+use datafusion_catalog::ScanArgs;
 use datafusion_common::display::ToStringifiedPlan;
 use datafusion_common::tree_node::{
     Transformed, TransformedResult, TreeNode, TreeNodeRecursion, 
TreeNodeVisitor,
@@ -459,9 +460,13 @@ impl DefaultPhysicalPlanner {
                 // doesn't know (nor should care) how the relation was
                 // referred to in the query
                 let filters = unnormalize_cols(filters.iter().cloned());
-                source
-                    .scan(session_state, projection.as_ref(), &filters, *fetch)
-                    .await?
+                let filters_vec = filters.into_iter().collect::<Vec<_>>();
+                let opts = ScanArgs::default()
+                    .with_projection(projection.as_deref())
+                    .with_filters(Some(&filters_vec))
+                    .with_limit(*fetch);
+                let res = source.scan_with_args(session_state, opts).await?;
+                Arc::clone(res.plan())
             }
             LogicalPlan::Values(Values { values, schema }) => {
                 let exec_schema = schema.as_ref().to_owned().into();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to