adriangb commented on code in PR #20091:
URL: https://github.com/apache/datafusion/pull/20091#discussion_r2750153569
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1725,6 +1708,324 @@ impl DefaultPhysicalPlanner {
))
}
}
+
+ /// Plan a TableScan node, wrapping with ProjectionExec as needed.
+ ///
+ /// This method handles projection pushdown by:
+ /// 1. Computing which columns the scan needs to produce
+ /// 2. Creating the scan with minimal required columns
+ /// 3. Applying any remainder projection (for complex expressions)
+ /// 4. Attempting to push non-async expressions into the scan via
`try_swapping_with_projection`
+ async fn plan_table_scan(
+ &self,
+ scan: &TableScan,
+ session_state: &SessionState,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let provider = source_as_provider(&scan.source)?;
+ let source_schema = scan.source.schema();
+
+ // Remove qualifiers from filters
+ let filters: Vec<Expr> =
unnormalize_cols(scan.filters.iter().cloned());
+
+ // Compute required column indices and remainder projection
+ let (remainder_projection, scan_indices) =
+ self.compute_scan_projection(&scan.projection, &source_schema)?;
+
+ // Create the scan
+ let scan_args = ScanArgs::default()
+ .with_projection(Some(&scan_indices))
+ .with_filters(if filters.is_empty() {
+ None
+ } else {
+ Some(&filters)
+ })
+ .with_limit(scan.fetch);
+
+ let scan_result = provider.scan_with_args(session_state,
scan_args).await?;
+ let mut plan: Arc<dyn ExecutionPlan> = Arc::clone(scan_result.plan());
+
+ // Wrap with ProjectionExec if remainder projection is needed
+ if let Some(ref proj_exprs) = remainder_projection {
+ let scan_output_schema = plan.schema();
+ let scan_df_schema =
DFSchema::try_from(scan_output_schema.as_ref().clone())?;
+ let unnormalized_proj_exprs: Vec<Expr> =
+ unnormalize_cols(proj_exprs.iter().cloned());
+
+ // Classify expressions as async or non-async
+ let (async_indices, non_async_indices) =
self.classify_projection_exprs(
+ &unnormalized_proj_exprs,
+ &scan_df_schema,
+ session_state,
+ )?;
+
+ if async_indices.is_empty() {
+ // All expressions are non-async - try to push the entire
projection
+ let physical_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
+ unnormalized_proj_exprs
+ .iter()
+ .map(|e| {
+ let physical = self.create_physical_expr(
+ e,
+ &scan_df_schema,
+ session_state,
+ )?;
+ let name = e.schema_name().to_string();
+ Ok((physical, name))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let proj_exprs: Vec<ProjectionExpr> = physical_exprs
+ .into_iter()
+ .map(|(expr, alias)| ProjectionExpr { expr, alias })
+ .collect();
+ let projection_exec =
+ ProjectionExec::try_new(proj_exprs, Arc::clone(&plan))?;
+
+ match plan.try_swapping_with_projection(&projection_exec)? {
+ Some(optimized_plan) => {
+ plan = optimized_plan;
+ }
+ None => {
+ plan = Arc::new(projection_exec);
+ }
+ }
+ } else if non_async_indices.is_empty() {
+ // All expressions are async - just create ProjectionExec with
AsyncFuncExec
+ plan = self.create_projection_exec(
+ &unnormalized_proj_exprs,
+ plan,
+ &scan_df_schema,
+ session_state,
+ )?;
+ } else {
+ // Mixed: push non-async expressions + columns needed by
async, keep async on top
Review Comment:
Need to make sure we have test coverage that hits this
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1725,6 +1708,324 @@ impl DefaultPhysicalPlanner {
))
}
}
+
+ /// Plan a TableScan node, wrapping with ProjectionExec as needed.
+ ///
+ /// This method handles projection pushdown by:
+ /// 1. Computing which columns the scan needs to produce
+ /// 2. Creating the scan with minimal required columns
+ /// 3. Applying any remainder projection (for complex expressions)
+ /// 4. Attempting to push non-async expressions into the scan via
`try_swapping_with_projection`
+ async fn plan_table_scan(
+ &self,
+ scan: &TableScan,
+ session_state: &SessionState,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let provider = source_as_provider(&scan.source)?;
+ let source_schema = scan.source.schema();
+
+ // Remove qualifiers from filters
+ let filters: Vec<Expr> =
unnormalize_cols(scan.filters.iter().cloned());
+
+ // Compute required column indices and remainder projection
+ let (remainder_projection, scan_indices) =
+ self.compute_scan_projection(&scan.projection, &source_schema)?;
+
+ // Create the scan
+ let scan_args = ScanArgs::default()
+ .with_projection(Some(&scan_indices))
+ .with_filters(if filters.is_empty() {
+ None
+ } else {
+ Some(&filters)
+ })
+ .with_limit(scan.fetch);
+
+ let scan_result = provider.scan_with_args(session_state,
scan_args).await?;
+ let mut plan: Arc<dyn ExecutionPlan> = Arc::clone(scan_result.plan());
+
+ // Wrap with ProjectionExec if remainder projection is needed
+ if let Some(ref proj_exprs) = remainder_projection {
+ let scan_output_schema = plan.schema();
+ let scan_df_schema =
DFSchema::try_from(scan_output_schema.as_ref().clone())?;
+ let unnormalized_proj_exprs: Vec<Expr> =
+ unnormalize_cols(proj_exprs.iter().cloned());
+
+ // Classify expressions as async or non-async
Review Comment:
Not sure if there's a better way to handle this. Async UDFs seem to be a bit
of a pain. I wonder if there's some existing helpers, etc. If not I think I can
also encapsulate this pattern of "split the projection into a top remainder and
inner one that matches a closure" since I see us using it a lot.
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -1815,11 +1815,16 @@ impl LogicalPlan {
..
}) => {
let projected_fields = match projection {
- Some(indices) => {
- let schema = source.schema();
- let names: Vec<&str> = indices
+ Some(exprs) => {
+ let names: Vec<String> = exprs
.iter()
- .map(|i| schema.field(*i).name().as_str())
+ .map(|e| {
+ if let Expr::Column(col) = e {
+ col.name.clone()
+ } else {
+ e.schema_name().to_string()
+ }
Review Comment:
Note: this is in the Display impl
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1725,6 +1708,324 @@ impl DefaultPhysicalPlanner {
))
}
}
+
+ /// Plan a TableScan node, wrapping with ProjectionExec as needed.
+ ///
+ /// This method handles projection pushdown by:
+ /// 1. Computing which columns the scan needs to produce
+ /// 2. Creating the scan with minimal required columns
+ /// 3. Applying any remainder projection (for complex expressions)
+ /// 4. Attempting to push non-async expressions into the scan via
`try_swapping_with_projection`
+ async fn plan_table_scan(
+ &self,
+ scan: &TableScan,
+ session_state: &SessionState,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let provider = source_as_provider(&scan.source)?;
+ let source_schema = scan.source.schema();
+
+ // Remove qualifiers from filters
+ let filters: Vec<Expr> =
unnormalize_cols(scan.filters.iter().cloned());
+
+ // Compute required column indices and remainder projection
+ let (remainder_projection, scan_indices) =
+ self.compute_scan_projection(&scan.projection, &source_schema)?;
+
+ // Create the scan
+ let scan_args = ScanArgs::default()
+ .with_projection(Some(&scan_indices))
+ .with_filters(if filters.is_empty() {
+ None
+ } else {
+ Some(&filters)
+ })
+ .with_limit(scan.fetch);
+
+ let scan_result = provider.scan_with_args(session_state,
scan_args).await?;
+ let mut plan: Arc<dyn ExecutionPlan> = Arc::clone(scan_result.plan());
+
+ // Wrap with ProjectionExec if remainder projection is needed
+ if let Some(ref proj_exprs) = remainder_projection {
+ let scan_output_schema = plan.schema();
+ let scan_df_schema =
DFSchema::try_from(scan_output_schema.as_ref().clone())?;
+ let unnormalized_proj_exprs: Vec<Expr> =
+ unnormalize_cols(proj_exprs.iter().cloned());
+
+ // Classify expressions as async or non-async
+ let (async_indices, non_async_indices) =
self.classify_projection_exprs(
+ &unnormalized_proj_exprs,
+ &scan_df_schema,
+ session_state,
+ )?;
+
+ if async_indices.is_empty() {
+ // All expressions are non-async - try to push the entire
projection
+ let physical_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
+ unnormalized_proj_exprs
+ .iter()
+ .map(|e| {
+ let physical = self.create_physical_expr(
+ e,
+ &scan_df_schema,
+ session_state,
+ )?;
+ let name = e.schema_name().to_string();
+ Ok((physical, name))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let proj_exprs: Vec<ProjectionExpr> = physical_exprs
+ .into_iter()
+ .map(|(expr, alias)| ProjectionExpr { expr, alias })
+ .collect();
+ let projection_exec =
+ ProjectionExec::try_new(proj_exprs, Arc::clone(&plan))?;
+
+ match plan.try_swapping_with_projection(&projection_exec)? {
+ Some(optimized_plan) => {
+ plan = optimized_plan;
+ }
+ None => {
+ plan = Arc::new(projection_exec);
+ }
+ }
+ } else if non_async_indices.is_empty() {
+ // All expressions are async - just create ProjectionExec with
AsyncFuncExec
+ plan = self.create_projection_exec(
+ &unnormalized_proj_exprs,
+ plan,
+ &scan_df_schema,
+ session_state,
+ )?;
+ } else {
+ // Mixed: push non-async expressions + columns needed by
async, keep async on top
+
+ // Collect columns needed by async expressions
+ let mut async_cols: HashSet<String> = HashSet::new();
+ for &idx in &async_indices {
+ unnormalized_proj_exprs[idx].apply(|e| {
+ if let Expr::Column(col) = e {
+ async_cols.insert(col.name().to_string());
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })?;
+ }
+
+ // Build pushdown projection: non-async exprs + columns for
async
+ let mut pushdown_exprs: Vec<Expr> = non_async_indices
+ .iter()
+ .map(|&i| unnormalized_proj_exprs[i].clone())
+ .collect();
+
+ // Add column references for async expression dependencies
+ for col_name in &async_cols {
+ if scan_output_schema.index_of(col_name).is_ok() {
+ let col_expr =
+
Expr::Column(Column::new_unqualified(col_name.clone()));
+ // Only add if not already in pushdown_exprs
+ if !pushdown_exprs.iter().any(|e| e == &col_expr) {
+ pushdown_exprs.push(col_expr);
+ }
+ }
+ }
+
+ // Create and try to push the non-async projection
+ let physical_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
pushdown_exprs
+ .iter()
+ .map(|e| {
+ let physical =
+ self.create_physical_expr(e, &scan_df_schema,
session_state)?;
+ let name = e.schema_name().to_string();
+ Ok((physical, name))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let proj_exprs: Vec<ProjectionExpr> = physical_exprs
+ .into_iter()
+ .map(|(expr, alias)| ProjectionExpr { expr, alias })
+ .collect();
+ let projection_exec =
+ ProjectionExec::try_new(proj_exprs, Arc::clone(&plan))?;
+
+ match plan.try_swapping_with_projection(&projection_exec)? {
+ Some(optimized_plan) => {
+ plan = optimized_plan;
+ }
+ None => {
+ plan = Arc::new(projection_exec);
+ }
+ }
+
+ // Now apply the full projection (including async) on top
+ // The schema has changed, so we need a new DFSchema
+ let new_schema = plan.schema();
+ let new_df_schema =
DFSchema::try_from(new_schema.as_ref().clone())?;
+
+ // The final projection needs to produce the original output
+ plan = self.create_projection_exec(
+ &unnormalized_proj_exprs,
+ plan,
+ &new_df_schema,
+ session_state,
+ )?;
+ }
+ }
+
+ Ok(plan)
+ }
+
+ /// Compute the column indices needed for the scan based on projection
expressions.
+ ///
+ /// Returns a tuple of:
+ /// - `Option<Vec<Expr>>`: Remainder projection to apply on top of the
scan output.
+ /// `None` if the projection is all simple column references
(reordering, dropping, etc.)
+ /// - `Vec<usize>`: Column indices to scan from the source.
+ fn compute_scan_projection(
+ &self,
+ projection: &Option<Vec<Expr>>,
+ source_schema: &Schema,
+ ) -> Result<(Option<Vec<Expr>>, Vec<usize>)> {
+ let Some(exprs) = projection else {
+ // None means scan all columns, no remainder needed
+ return Ok((None, (0..source_schema.fields().len()).collect()));
+ };
+
+ if exprs.is_empty() {
+ return Ok((None, vec![]));
+ }
+
+ let mut has_complex_expr = false;
+ let mut all_required_columns = BTreeSet::new();
+ let mut remainder_exprs = vec![];
+
+ for expr in exprs {
+ // Collect all column references from this expression
+ let mut is_complex_expr = false;
+ expr.apply(|e| {
+ if let Expr::Column(col) = e {
+ if let Ok(index) = source_schema.index_of(col.name()) {
+ // If we made it this far this must be the first level
and the whole expression is a simple column reference
+ // But we don't know if subsequent expressions might
have more complex expressions necessitating `remainder_exprs`
+ // to be populated, so we push to `remainder_exprs`
just in case they are needed later.
+ // It is simpler to do this now than to try to
backtrack later since we already matched into Expr::Column
+ // and thus can simply clone `expr` here.
+ // If `is_complex_expr` is true then we will append
the complex expression itself to `remainder_exprs` instead
+ // later once we've fully traversed this expression.
+ if !is_complex_expr {
+ remainder_exprs.push(expr.clone());
+ }
+ all_required_columns.insert(index);
+ }
+ } else {
+ // Nothing to do here except note that we will have to
append the full expression later
+ is_complex_expr = true;
+ }
+ Ok(TreeNodeRecursion::Continue)
+ })?;
+ if is_complex_expr {
+ // If any expression in the projection is not a simple column
reference we will need to apply a remainder projection
+ has_complex_expr = true;
+ // Append the full expression itself to the remainder
expressions
+ // So given a projection like `[a, a + c, d]` we would have:
+ // all_required_columns = {0, 2, 3}
+ // original schema: [a: Int, b: Int, c: Int, d: Int]
+ // projected schema: [a: Int, c: Int, d: Int]
+ // remainder_exprs = [col(a), col(a) + col(c), col(d)]
+ remainder_exprs.push(expr.clone());
+ }
+ }
+
+ Ok((
+ has_complex_expr.then_some(remainder_exprs),
+ all_required_columns.into_iter().collect(),
+ ))
+ }
+
+ /// Classifies projection expressions into async and non-async.
+ ///
+ /// Returns `(async_expr_indices, non_async_expr_indices)` where:
+ /// - `async_expr_indices`: indices of expressions that contain async UDFs
+ /// - `non_async_expr_indices`: indices of expressions that are purely
synchronous
+ fn classify_projection_exprs(
+ &self,
+ exprs: &[Expr],
+ input_dfschema: &DFSchema,
+ session_state: &SessionState,
+ ) -> Result<(Vec<usize>, Vec<usize>)> {
+ let mut async_indices = vec![];
+ let mut non_async_indices = vec![];
+ let schema = input_dfschema.as_arrow();
+
+ for (i, expr) in exprs.iter().enumerate() {
+ let physical =
+ self.create_physical_expr(expr, input_dfschema,
session_state)?;
+
+ // Use AsyncMapper to check if expression has async UDFs
+ let mut async_map = AsyncMapper::new(schema.fields().len());
+ async_map.find_references(&physical, schema)?;
+
+ if async_map.is_empty() {
+ non_async_indices.push(i);
+ } else {
+ async_indices.push(i);
+ }
+ }
+
+ Ok((async_indices, non_async_indices))
+ }
+
+ /// Creates a ProjectionExec from logical expressions, handling async UDF
expressions.
+ ///
+ /// If the expressions contain async UDFs, wraps them with `AsyncFuncExec`.
+ fn create_projection_exec(
+ &self,
+ exprs: &[Expr],
+ input: Arc<dyn ExecutionPlan>,
+ input_dfschema: &DFSchema,
+ session_state: &SessionState,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let physical_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = exprs
+ .iter()
+ .map(|e| {
+ let physical =
+ self.create_physical_expr(e, input_dfschema,
session_state)?;
+ let name = e.schema_name().to_string();
+ Ok((physical, name))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let num_input_columns = input.schema().fields().len();
+ let input_schema = input.schema();
+
+ match self.try_plan_async_exprs(
Review Comment:
Need to investigate how `try_plan_async_exprs` and how it overlaps with the
projection splitting above.
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -2807,14 +2829,129 @@ impl TableScan {
Ok(Self {
Review Comment:
Wonder if we could ruse TableScanBuilder here?
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1725,6 +1708,324 @@ impl DefaultPhysicalPlanner {
))
}
}
+
+ /// Plan a TableScan node, wrapping with ProjectionExec as needed.
+ ///
+ /// This method handles projection pushdown by:
+ /// 1. Computing which columns the scan needs to produce
+ /// 2. Creating the scan with minimal required columns
+ /// 3. Applying any remainder projection (for complex expressions)
+ /// 4. Attempting to push non-async expressions into the scan via
`try_swapping_with_projection`
+ async fn plan_table_scan(
+ &self,
+ scan: &TableScan,
+ session_state: &SessionState,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let provider = source_as_provider(&scan.source)?;
+ let source_schema = scan.source.schema();
+
+ // Remove qualifiers from filters
+ let filters: Vec<Expr> =
unnormalize_cols(scan.filters.iter().cloned());
+
+ // Compute required column indices and remainder projection
+ let (remainder_projection, scan_indices) =
+ self.compute_scan_projection(&scan.projection, &source_schema)?;
+
+ // Create the scan
+ let scan_args = ScanArgs::default()
+ .with_projection(Some(&scan_indices))
+ .with_filters(if filters.is_empty() {
+ None
+ } else {
+ Some(&filters)
+ })
+ .with_limit(scan.fetch);
+
+ let scan_result = provider.scan_with_args(session_state,
scan_args).await?;
+ let mut plan: Arc<dyn ExecutionPlan> = Arc::clone(scan_result.plan());
+
+ // Wrap with ProjectionExec if remainder projection is needed
+ if let Some(ref proj_exprs) = remainder_projection {
+ let scan_output_schema = plan.schema();
+ let scan_df_schema =
DFSchema::try_from(scan_output_schema.as_ref().clone())?;
+ let unnormalized_proj_exprs: Vec<Expr> =
+ unnormalize_cols(proj_exprs.iter().cloned());
+
+ // Classify expressions as async or non-async
+ let (async_indices, non_async_indices) =
self.classify_projection_exprs(
+ &unnormalized_proj_exprs,
+ &scan_df_schema,
+ session_state,
+ )?;
+
+ if async_indices.is_empty() {
+ // All expressions are non-async - try to push the entire
projection
+ let physical_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
+ unnormalized_proj_exprs
+ .iter()
+ .map(|e| {
+ let physical = self.create_physical_expr(
+ e,
+ &scan_df_schema,
+ session_state,
+ )?;
+ let name = e.schema_name().to_string();
+ Ok((physical, name))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let proj_exprs: Vec<ProjectionExpr> = physical_exprs
+ .into_iter()
+ .map(|(expr, alias)| ProjectionExpr { expr, alias })
+ .collect();
+ let projection_exec =
+ ProjectionExec::try_new(proj_exprs, Arc::clone(&plan))?;
+
+ match plan.try_swapping_with_projection(&projection_exec)? {
Review Comment:
Might move this out of this PR. The idea is that once we have projections in
`TableScan` then the projection pushdown optimizer can handle pushing down
`get_field` expressions all the way into `TableScan` (past any filters) and
then we can here push those into the scan itself (if the scan accepts them).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]