alamb commented on code in PR #17336:
URL: https://github.com/apache/datafusion/pull/17336#discussion_r2344142167
##########
datafusion/catalog/src/table.rs:
##########
@@ -299,6 +334,119 @@ pub trait TableProvider: Debug + Sync + Send {
}
}
+/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
+///
+/// `ScanArgs` provides a structured way to pass scan parameters to table
providers,
+/// replacing the multiple individual parameters used by
[`TableProvider::scan`].
+/// This struct uses the builder pattern for convenient construction.
+///
+/// # Examples
+///
+/// ```
+/// # use datafusion_catalog::ScanArgs;
+/// # use datafusion_expr::Expr;
+/// let args = ScanArgs::default()
+/// .with_projection(Some(vec![0, 2, 4]))
+/// .with_limit(Some(1000));
+/// ```
+#[derive(Debug, Clone, Default)]
+pub struct ScanArgs {
+ filters: Option<Vec<Expr>>,
+ projection: Option<Vec<usize>>,
+ limit: Option<usize>,
+}
+
+impl ScanArgs {
+ /// Set the column projection for the scan.
+ ///
+ /// The projection is a list of column indices from
[`TableProvider::schema`]
+ /// that should be included in the scan results. If `None`, all columns
are included.
+ ///
+ /// # Arguments
+ /// * `projection` - Optional list of column indices to project
+ pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
+ self.projection = projection;
+ self
+ }
+
+ /// Get the column projection for the scan.
+ ///
+ /// Returns a cloned copy of the projection column indices, or `None` if
+ /// no projection was specified (meaning all columns should be included).
+ pub fn projection(&self) -> Option<Vec<usize>> {
Review Comment:
```suggestion
pub fn projection(&self) -> Option<&Vec<usize>> {
```
##########
datafusion/catalog/src/table.rs:
##########
@@ -299,6 +334,119 @@ pub trait TableProvider: Debug + Sync + Send {
}
}
+/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
+///
+/// `ScanArgs` provides a structured way to pass scan parameters to table
providers,
+/// replacing the multiple individual parameters used by
[`TableProvider::scan`].
+/// This struct uses the builder pattern for convenient construction.
+///
+/// # Examples
+///
+/// ```
+/// # use datafusion_catalog::ScanArgs;
+/// # use datafusion_expr::Expr;
+/// let args = ScanArgs::default()
+/// .with_projection(Some(vec![0, 2, 4]))
+/// .with_limit(Some(1000));
+/// ```
+#[derive(Debug, Clone, Default)]
+pub struct ScanArgs {
+ filters: Option<Vec<Expr>>,
+ projection: Option<Vec<usize>>,
+ limit: Option<usize>,
+}
+
+impl ScanArgs {
+ /// Set the column projection for the scan.
+ ///
+ /// The projection is a list of column indices from
[`TableProvider::schema`]
+ /// that should be included in the scan results. If `None`, all columns
are included.
+ ///
+ /// # Arguments
+ /// * `projection` - Optional list of column indices to project
+ pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
+ self.projection = projection;
+ self
+ }
+
+ /// Get the column projection for the scan.
+ ///
+ /// Returns a cloned copy of the projection column indices, or `None` if
+ /// no projection was specified (meaning all columns should be included).
+ pub fn projection(&self) -> Option<Vec<usize>> {
Review Comment:
I think this should return a reference (`Option<&Vec<usize>>`) to avoid
forcing clones even if the caller doesn't need a copy
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -459,9 +460,12 @@ impl DefaultPhysicalPlanner {
// doesn't know (nor should care) how the relation was
// referred to in the query
let filters = unnormalize_cols(filters.iter().cloned());
- source
- .scan(session_state, projection.as_ref(), &filters, *fetch)
- .await?
+ let opts = ScanArgs::default()
+ .with_projection(projection.clone())
Review Comment:
is there a reason we need to cone the projection here? I think it would be
better and avoid the clone and pass in the arguments via reference (so
something like `ScanArgs<'a>`)
##########
datafusion/catalog/src/table.rs:
##########
@@ -299,6 +334,119 @@ pub trait TableProvider: Debug + Sync + Send {
}
}
+/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
+///
+/// `ScanArgs` provides a structured way to pass scan parameters to table
providers,
+/// replacing the multiple individual parameters used by
[`TableProvider::scan`].
+/// This struct uses the builder pattern for convenient construction.
+///
+/// # Examples
+///
+/// ```
+/// # use datafusion_catalog::ScanArgs;
+/// # use datafusion_expr::Expr;
+/// let args = ScanArgs::default()
+/// .with_projection(Some(vec![0, 2, 4]))
+/// .with_limit(Some(1000));
+/// ```
+#[derive(Debug, Clone, Default)]
+pub struct ScanArgs {
+ filters: Option<Vec<Expr>>,
+ projection: Option<Vec<usize>>,
+ limit: Option<usize>,
+}
+
+impl ScanArgs {
+ /// Set the column projection for the scan.
+ ///
+ /// The projection is a list of column indices from
[`TableProvider::schema`]
+ /// that should be included in the scan results. If `None`, all columns
are included.
+ ///
+ /// # Arguments
+ /// * `projection` - Optional list of column indices to project
+ pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
+ self.projection = projection;
+ self
+ }
+
+ /// Get the column projection for the scan.
+ ///
+ /// Returns a cloned copy of the projection column indices, or `None` if
+ /// no projection was specified (meaning all columns should be included).
+ pub fn projection(&self) -> Option<Vec<usize>> {
+ self.projection.clone()
+ }
+
+ /// Set the filter expressions for the scan.
+ ///
+ /// Filters are boolean expressions that should be evaluated during the
scan
+ /// to reduce the number of rows returned. All expressions are combined
with AND logic.
+ /// Whether filters are actually pushed down depends on
[`TableProvider::supports_filters_pushdown`].
+ ///
+ /// # Arguments
+ /// * `filters` - Optional list of filter expressions
+ pub fn with_filters(mut self, filters: Option<Vec<Expr>>) -> Self {
+ self.filters = filters;
+ self
+ }
+
+ /// Get the filter expressions for the scan.
+ ///
+ /// Returns a reference to the filter expressions, or `None` if no filters
were specified.
+ pub fn filters(&self) -> Option<&[Expr]> {
+ self.filters.as_deref()
+ }
+
+ /// Set the maximum number of rows to return from the scan.
+ ///
+ /// If specified, the scan should return at most this many rows. This is
typically
+ /// used to optimize queries with `LIMIT` clauses.
+ ///
+ /// # Arguments
+ /// * `limit` - Optional maximum number of rows to return
+ pub fn with_limit(mut self, limit: Option<usize>) -> Self {
+ self.limit = limit;
+ self
+ }
+
+ /// Get the maximum number of rows to return from the scan.
+ ///
+ /// Returns the row limit, or `None` if no limit was specified.
+ pub fn limit(&self) -> Option<usize> {
+ self.limit
+ }
+}
+
+/// Result of a table scan operation from [`TableProvider::scan_with_args`].
+///
+/// `ScanResult` encapsulates the [`ExecutionPlan`] produced by a table scan,
+/// providing a typed return value instead of returning the plan directly.
+/// This allows for future extensibility of scan results without breaking
+/// the API.
Review Comment:
How much value does this comment add? While accurate I don't think it
provides much value and just makes the code harder to read (sounds like an LLM
wrote it lol)
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -1166,6 +1166,22 @@ impl TableProvider for ListingTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ let options = ScanArgs::default()
+ .with_projection(projection.cloned())
+ .with_filters(Some(filters.to_vec()))
+ .with_limit(limit);
+ Ok(self.scan_with_args(state, options).await?.plan())
+ }
Review Comment:
I think the recursion error is unfortunate -- maybe we could avoid problems
with good enough documentation but it seems like a rough edge on one of the
core experiences (TableProvider)
Here is an alternate proposal:
1. leave `scan` required (and leave `scan_with_args` in terms of `scan`)
Once we are happy with `scan_with_args` we can then properly deprecate
(maybe even remove `scan`) in favor of `scan_with_args` as a follow on PR (in
some future PR)
This approach has the downside that it is more complicated to implement
`scan_with_args` as you will be forced to also provide a `not implemented
error` or something.
The upside is that there is no change needed to existing code and those who
aren't yet interested in scan_with_args won't need a change
> I also chose not to deprecate scan() to (1) avoid a bunch of code churn
mostly for us and (2) until we are confident that scan_with_args is a workable
evolution
This seems like a good idea to me
##########
datafusion/catalog/src/table.rs:
##########
@@ -299,6 +334,119 @@ pub trait TableProvider: Debug + Sync + Send {
}
}
+/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
+///
+/// `ScanArgs` provides a structured way to pass scan parameters to table
providers,
+/// replacing the multiple individual parameters used by
[`TableProvider::scan`].
+/// This struct uses the builder pattern for convenient construction.
+///
+/// # Examples
+///
+/// ```
+/// # use datafusion_catalog::ScanArgs;
+/// # use datafusion_expr::Expr;
+/// let args = ScanArgs::default()
+/// .with_projection(Some(vec![0, 2, 4]))
+/// .with_limit(Some(1000));
+/// ```
+#[derive(Debug, Clone, Default)]
+pub struct ScanArgs {
+ filters: Option<Vec<Expr>>,
+ projection: Option<Vec<usize>>,
+ limit: Option<usize>,
+}
+
+impl ScanArgs {
+ /// Set the column projection for the scan.
+ ///
+ /// The projection is a list of column indices from
[`TableProvider::schema`]
+ /// that should be included in the scan results. If `None`, all columns
are included.
+ ///
+ /// # Arguments
+ /// * `projection` - Optional list of column indices to project
+ pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
+ self.projection = projection;
+ self
+ }
+
+ /// Get the column projection for the scan.
+ ///
+ /// Returns a cloned copy of the projection column indices, or `None` if
+ /// no projection was specified (meaning all columns should be included).
+ pub fn projection(&self) -> Option<Vec<usize>> {
+ self.projection.clone()
+ }
+
+ /// Set the filter expressions for the scan.
+ ///
+ /// Filters are boolean expressions that should be evaluated during the
scan
+ /// to reduce the number of rows returned. All expressions are combined
with AND logic.
+ /// Whether filters are actually pushed down depends on
[`TableProvider::supports_filters_pushdown`].
+ ///
+ /// # Arguments
+ /// * `filters` - Optional list of filter expressions
+ pub fn with_filters(mut self, filters: Option<Vec<Expr>>) -> Self {
+ self.filters = filters;
+ self
+ }
+
+ /// Get the filter expressions for the scan.
+ ///
+ /// Returns a reference to the filter expressions, or `None` if no filters
were specified.
+ pub fn filters(&self) -> Option<&[Expr]> {
+ self.filters.as_deref()
+ }
+
+ /// Set the maximum number of rows to return from the scan.
+ ///
+ /// If specified, the scan should return at most this many rows. This is
typically
+ /// used to optimize queries with `LIMIT` clauses.
+ ///
+ /// # Arguments
+ /// * `limit` - Optional maximum number of rows to return
+ pub fn with_limit(mut self, limit: Option<usize>) -> Self {
+ self.limit = limit;
+ self
+ }
+
+ /// Get the maximum number of rows to return from the scan.
+ ///
+ /// Returns the row limit, or `None` if no limit was specified.
+ pub fn limit(&self) -> Option<usize> {
+ self.limit
+ }
+}
+
+/// Result of a table scan operation from [`TableProvider::scan_with_args`].
+///
+/// `ScanResult` encapsulates the [`ExecutionPlan`] produced by a table scan,
+/// providing a typed return value instead of returning the plan directly.
+/// This allows for future extensibility of scan results without breaking
+/// the API.
+#[derive(Debug, Clone)]
+pub struct ScanResult {
+ /// The ExecutionPlan to run.
+ plan: Arc<dyn ExecutionPlan>,
+}
+
+impl ScanResult {
+ /// Create a new `ScanResult` with the given execution plan.
+ ///
+ /// # Arguments
+ /// * `plan` - The execution plan that will perform the table scan
+ pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+ Self { plan }
+ }
+
+ /// Get the execution plan for this scan result.
+ ///
+ /// Returns a cloned reference to the [`ExecutionPlan`] that will perform
+ /// the actual table scanning and data retrieval.
+ pub fn plan(&self) -> Arc<dyn ExecutionPlan> {
Review Comment:
I also recommend here providing this as a reference as well as providing a
way to get the plan without a clone
```suggestion
pub fn plan(&self) -> &Arc<dyn ExecutionPlan> {
```
And then also add a function like `ScanResult::into_inner` to return the
inner plan
##########
datafusion/catalog/src/table.rs:
##########
@@ -299,6 +334,119 @@ pub trait TableProvider: Debug + Sync + Send {
}
}
+/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
+///
+/// `ScanArgs` provides a structured way to pass scan parameters to table
providers,
+/// replacing the multiple individual parameters used by
[`TableProvider::scan`].
+/// This struct uses the builder pattern for convenient construction.
+///
+/// # Examples
+///
+/// ```
+/// # use datafusion_catalog::ScanArgs;
+/// # use datafusion_expr::Expr;
+/// let args = ScanArgs::default()
+/// .with_projection(Some(vec![0, 2, 4]))
+/// .with_limit(Some(1000));
+/// ```
+#[derive(Debug, Clone, Default)]
+pub struct ScanArgs {
+ filters: Option<Vec<Expr>>,
+ projection: Option<Vec<usize>>,
+ limit: Option<usize>,
+}
+
+impl ScanArgs {
+ /// Set the column projection for the scan.
+ ///
+ /// The projection is a list of column indices from
[`TableProvider::schema`]
+ /// that should be included in the scan results. If `None`, all columns
are included.
+ ///
+ /// # Arguments
+ /// * `projection` - Optional list of column indices to project
+ pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
+ self.projection = projection;
+ self
+ }
+
+ /// Get the column projection for the scan.
+ ///
+ /// Returns a cloned copy of the projection column indices, or `None` if
+ /// no projection was specified (meaning all columns should be included).
+ pub fn projection(&self) -> Option<Vec<usize>> {
+ self.projection.clone()
+ }
+
+ /// Set the filter expressions for the scan.
+ ///
+ /// Filters are boolean expressions that should be evaluated during the
scan
+ /// to reduce the number of rows returned. All expressions are combined
with AND logic.
+ /// Whether filters are actually pushed down depends on
[`TableProvider::supports_filters_pushdown`].
+ ///
+ /// # Arguments
+ /// * `filters` - Optional list of filter expressions
+ pub fn with_filters(mut self, filters: Option<Vec<Expr>>) -> Self {
+ self.filters = filters;
+ self
+ }
+
+ /// Get the filter expressions for the scan.
+ ///
+ /// Returns a reference to the filter expressions, or `None` if no filters
were specified.
+ pub fn filters(&self) -> Option<&[Expr]> {
Review Comment:
(minor) I recommend returing the raw Vec reference so the caller can use Vec
specific functions
```suggestion
pub fn filters(&self) -> Option<&Vec<Expr>> {
```
##########
datafusion/catalog/src/table.rs:
##########
@@ -299,6 +334,119 @@ pub trait TableProvider: Debug + Sync + Send {
}
}
+/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
+///
+/// `ScanArgs` provides a structured way to pass scan parameters to table
providers,
+/// replacing the multiple individual parameters used by
[`TableProvider::scan`].
+/// This struct uses the builder pattern for convenient construction.
+///
+/// # Examples
+///
+/// ```
+/// # use datafusion_catalog::ScanArgs;
+/// # use datafusion_expr::Expr;
+/// let args = ScanArgs::default()
+/// .with_projection(Some(vec![0, 2, 4]))
+/// .with_limit(Some(1000));
+/// ```
+#[derive(Debug, Clone, Default)]
+pub struct ScanArgs {
+ filters: Option<Vec<Expr>>,
+ projection: Option<Vec<usize>>,
+ limit: Option<usize>,
+}
+
+impl ScanArgs {
+ /// Set the column projection for the scan.
+ ///
+ /// The projection is a list of column indices from
[`TableProvider::schema`]
+ /// that should be included in the scan results. If `None`, all columns
are included.
+ ///
+ /// # Arguments
+ /// * `projection` - Optional list of column indices to project
+ pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
+ self.projection = projection;
+ self
+ }
+
+ /// Get the column projection for the scan.
+ ///
+ /// Returns a cloned copy of the projection column indices, or `None` if
+ /// no projection was specified (meaning all columns should be included).
+ pub fn projection(&self) -> Option<Vec<usize>> {
+ self.projection.clone()
+ }
+
+ /// Set the filter expressions for the scan.
+ ///
+ /// Filters are boolean expressions that should be evaluated during the
scan
+ /// to reduce the number of rows returned. All expressions are combined
with AND logic.
+ /// Whether filters are actually pushed down depends on
[`TableProvider::supports_filters_pushdown`].
+ ///
+ /// # Arguments
+ /// * `filters` - Optional list of filter expressions
+ pub fn with_filters(mut self, filters: Option<Vec<Expr>>) -> Self {
+ self.filters = filters;
+ self
+ }
+
+ /// Get the filter expressions for the scan.
+ ///
+ /// Returns a reference to the filter expressions, or `None` if no filters
were specified.
+ pub fn filters(&self) -> Option<&[Expr]> {
+ self.filters.as_deref()
+ }
+
+ /// Set the maximum number of rows to return from the scan.
+ ///
+ /// If specified, the scan should return at most this many rows. This is
typically
+ /// used to optimize queries with `LIMIT` clauses.
+ ///
+ /// # Arguments
+ /// * `limit` - Optional maximum number of rows to return
+ pub fn with_limit(mut self, limit: Option<usize>) -> Self {
+ self.limit = limit;
+ self
+ }
+
+ /// Get the maximum number of rows to return from the scan.
+ ///
+ /// Returns the row limit, or `None` if no limit was specified.
+ pub fn limit(&self) -> Option<usize> {
+ self.limit
+ }
+}
+
+/// Result of a table scan operation from [`TableProvider::scan_with_args`].
+///
+/// `ScanResult` encapsulates the [`ExecutionPlan`] produced by a table scan,
+/// providing a typed return value instead of returning the plan directly.
+/// This allows for future extensibility of scan results without breaking
+/// the API.
+#[derive(Debug, Clone)]
+pub struct ScanResult {
+ /// The ExecutionPlan to run.
+ plan: Arc<dyn ExecutionPlan>,
+}
+
+impl ScanResult {
+ /// Create a new `ScanResult` with the given execution plan.
+ ///
+ /// # Arguments
+ /// * `plan` - The execution plan that will perform the table scan
+ pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+ Self { plan }
+ }
+
+ /// Get the execution plan for this scan result.
+ ///
+ /// Returns a cloned reference to the [`ExecutionPlan`] that will perform
+ /// the actual table scanning and data retrieval.
+ pub fn plan(&self) -> Arc<dyn ExecutionPlan> {
Review Comment:
Another nice usability thing would be to provide a `From` impl so we can
make thse like `ScanResult::from(plan)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]