adriangb commented on code in PR #20091:
URL: https://github.com/apache/datafusion/pull/20091#discussion_r2750153569


##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1725,6 +1708,324 @@ impl DefaultPhysicalPlanner {
             ))
         }
     }
+
+    /// Plan a TableScan node, wrapping with ProjectionExec as needed.
+    ///
+    /// This method handles projection pushdown by:
+    /// 1. Computing which columns the scan needs to produce
+    /// 2. Creating the scan with minimal required columns
+    /// 3. Applying any remainder projection (for complex expressions)
+    /// 4. Attempting to push non-async expressions into the scan via 
`try_swapping_with_projection`
+    async fn plan_table_scan(
+        &self,
+        scan: &TableScan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let provider = source_as_provider(&scan.source)?;
+        let source_schema = scan.source.schema();
+
+        // Remove qualifiers from filters
+        let filters: Vec<Expr> = 
unnormalize_cols(scan.filters.iter().cloned());
+
+        // Compute required column indices and remainder projection
+        let (remainder_projection, scan_indices) =
+            self.compute_scan_projection(&scan.projection, &source_schema)?;
+
+        // Create the scan
+        let scan_args = ScanArgs::default()
+            .with_projection(Some(&scan_indices))
+            .with_filters(if filters.is_empty() {
+                None
+            } else {
+                Some(&filters)
+            })
+            .with_limit(scan.fetch);
+
+        let scan_result = provider.scan_with_args(session_state, 
scan_args).await?;
+        let mut plan: Arc<dyn ExecutionPlan> = Arc::clone(scan_result.plan());
+
+        // Wrap with ProjectionExec if remainder projection is needed
+        if let Some(ref proj_exprs) = remainder_projection {
+            let scan_output_schema = plan.schema();
+            let scan_df_schema = 
DFSchema::try_from(scan_output_schema.as_ref().clone())?;
+            let unnormalized_proj_exprs: Vec<Expr> =
+                unnormalize_cols(proj_exprs.iter().cloned());
+
+            // Classify expressions as async or non-async
+            let (async_indices, non_async_indices) = 
self.classify_projection_exprs(
+                &unnormalized_proj_exprs,
+                &scan_df_schema,
+                session_state,
+            )?;
+
+            if async_indices.is_empty() {
+                // All expressions are non-async - try to push the entire 
projection
+                let physical_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
+                    unnormalized_proj_exprs
+                        .iter()
+                        .map(|e| {
+                            let physical = self.create_physical_expr(
+                                e,
+                                &scan_df_schema,
+                                session_state,
+                            )?;
+                            let name = e.schema_name().to_string();
+                            Ok((physical, name))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
+
+                let proj_exprs: Vec<ProjectionExpr> = physical_exprs
+                    .into_iter()
+                    .map(|(expr, alias)| ProjectionExpr { expr, alias })
+                    .collect();
+                let projection_exec =
+                    ProjectionExec::try_new(proj_exprs, Arc::clone(&plan))?;
+
+                match plan.try_swapping_with_projection(&projection_exec)? {
+                    Some(optimized_plan) => {
+                        plan = optimized_plan;
+                    }
+                    None => {
+                        plan = Arc::new(projection_exec);
+                    }
+                }
+            } else if non_async_indices.is_empty() {
+                // All expressions are async - just create ProjectionExec with 
AsyncFuncExec
+                plan = self.create_projection_exec(
+                    &unnormalized_proj_exprs,
+                    plan,
+                    &scan_df_schema,
+                    session_state,
+                )?;
+            } else {
+                // Mixed: push non-async expressions + columns needed by 
async, keep async on top

Review Comment:
   Need to make sure we have test coverage that hits this



##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1725,6 +1708,324 @@ impl DefaultPhysicalPlanner {
             ))
         }
     }
+
+    /// Plan a TableScan node, wrapping with ProjectionExec as needed.
+    ///
+    /// This method handles projection pushdown by:
+    /// 1. Computing which columns the scan needs to produce
+    /// 2. Creating the scan with minimal required columns
+    /// 3. Applying any remainder projection (for complex expressions)
+    /// 4. Attempting to push non-async expressions into the scan via 
`try_swapping_with_projection`
+    async fn plan_table_scan(
+        &self,
+        scan: &TableScan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let provider = source_as_provider(&scan.source)?;
+        let source_schema = scan.source.schema();
+
+        // Remove qualifiers from filters
+        let filters: Vec<Expr> = 
unnormalize_cols(scan.filters.iter().cloned());
+
+        // Compute required column indices and remainder projection
+        let (remainder_projection, scan_indices) =
+            self.compute_scan_projection(&scan.projection, &source_schema)?;
+
+        // Create the scan
+        let scan_args = ScanArgs::default()
+            .with_projection(Some(&scan_indices))
+            .with_filters(if filters.is_empty() {
+                None
+            } else {
+                Some(&filters)
+            })
+            .with_limit(scan.fetch);
+
+        let scan_result = provider.scan_with_args(session_state, 
scan_args).await?;
+        let mut plan: Arc<dyn ExecutionPlan> = Arc::clone(scan_result.plan());
+
+        // Wrap with ProjectionExec if remainder projection is needed
+        if let Some(ref proj_exprs) = remainder_projection {
+            let scan_output_schema = plan.schema();
+            let scan_df_schema = 
DFSchema::try_from(scan_output_schema.as_ref().clone())?;
+            let unnormalized_proj_exprs: Vec<Expr> =
+                unnormalize_cols(proj_exprs.iter().cloned());
+
+            // Classify expressions as async or non-async

Review Comment:
   Not sure if there's a better way to handle this. Async UDFs seem to be a bit 
of a pain. I wonder if there's some existing helpers, etc. If not I think I can 
also encapsulate this pattern of "split the projection into a top remainder and 
inner one that matches a closure" since I see us using it a lot.



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -1815,11 +1815,16 @@ impl LogicalPlan {
                         ..
                     }) => {
                         let projected_fields = match projection {
-                            Some(indices) => {
-                                let schema = source.schema();
-                                let names: Vec<&str> = indices
+                            Some(exprs) => {
+                                let names: Vec<String> = exprs
                                     .iter()
-                                    .map(|i| schema.field(*i).name().as_str())
+                                    .map(|e| {
+                                        if let Expr::Column(col) = e {
+                                            col.name.clone()
+                                        } else {
+                                            e.schema_name().to_string()
+                                        }

Review Comment:
   Note: this is in the Display impl



##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1725,6 +1708,324 @@ impl DefaultPhysicalPlanner {
             ))
         }
     }
+
+    /// Plan a TableScan node, wrapping with ProjectionExec as needed.
+    ///
+    /// This method handles projection pushdown by:
+    /// 1. Computing which columns the scan needs to produce
+    /// 2. Creating the scan with minimal required columns
+    /// 3. Applying any remainder projection (for complex expressions)
+    /// 4. Attempting to push non-async expressions into the scan via 
`try_swapping_with_projection`
+    async fn plan_table_scan(
+        &self,
+        scan: &TableScan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let provider = source_as_provider(&scan.source)?;
+        let source_schema = scan.source.schema();
+
+        // Remove qualifiers from filters
+        let filters: Vec<Expr> = 
unnormalize_cols(scan.filters.iter().cloned());
+
+        // Compute required column indices and remainder projection
+        let (remainder_projection, scan_indices) =
+            self.compute_scan_projection(&scan.projection, &source_schema)?;
+
+        // Create the scan
+        let scan_args = ScanArgs::default()
+            .with_projection(Some(&scan_indices))
+            .with_filters(if filters.is_empty() {
+                None
+            } else {
+                Some(&filters)
+            })
+            .with_limit(scan.fetch);
+
+        let scan_result = provider.scan_with_args(session_state, 
scan_args).await?;
+        let mut plan: Arc<dyn ExecutionPlan> = Arc::clone(scan_result.plan());
+
+        // Wrap with ProjectionExec if remainder projection is needed
+        if let Some(ref proj_exprs) = remainder_projection {
+            let scan_output_schema = plan.schema();
+            let scan_df_schema = 
DFSchema::try_from(scan_output_schema.as_ref().clone())?;
+            let unnormalized_proj_exprs: Vec<Expr> =
+                unnormalize_cols(proj_exprs.iter().cloned());
+
+            // Classify expressions as async or non-async
+            let (async_indices, non_async_indices) = 
self.classify_projection_exprs(
+                &unnormalized_proj_exprs,
+                &scan_df_schema,
+                session_state,
+            )?;
+
+            if async_indices.is_empty() {
+                // All expressions are non-async - try to push the entire 
projection
+                let physical_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
+                    unnormalized_proj_exprs
+                        .iter()
+                        .map(|e| {
+                            let physical = self.create_physical_expr(
+                                e,
+                                &scan_df_schema,
+                                session_state,
+                            )?;
+                            let name = e.schema_name().to_string();
+                            Ok((physical, name))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
+
+                let proj_exprs: Vec<ProjectionExpr> = physical_exprs
+                    .into_iter()
+                    .map(|(expr, alias)| ProjectionExpr { expr, alias })
+                    .collect();
+                let projection_exec =
+                    ProjectionExec::try_new(proj_exprs, Arc::clone(&plan))?;
+
+                match plan.try_swapping_with_projection(&projection_exec)? {
+                    Some(optimized_plan) => {
+                        plan = optimized_plan;
+                    }
+                    None => {
+                        plan = Arc::new(projection_exec);
+                    }
+                }
+            } else if non_async_indices.is_empty() {
+                // All expressions are async - just create ProjectionExec with 
AsyncFuncExec
+                plan = self.create_projection_exec(
+                    &unnormalized_proj_exprs,
+                    plan,
+                    &scan_df_schema,
+                    session_state,
+                )?;
+            } else {
+                // Mixed: push non-async expressions + columns needed by 
async, keep async on top
+
+                // Collect columns needed by async expressions
+                let mut async_cols: HashSet<String> = HashSet::new();
+                for &idx in &async_indices {
+                    unnormalized_proj_exprs[idx].apply(|e| {
+                        if let Expr::Column(col) = e {
+                            async_cols.insert(col.name().to_string());
+                        }
+                        Ok(TreeNodeRecursion::Continue)
+                    })?;
+                }
+
+                // Build pushdown projection: non-async exprs + columns for 
async
+                let mut pushdown_exprs: Vec<Expr> = non_async_indices
+                    .iter()
+                    .map(|&i| unnormalized_proj_exprs[i].clone())
+                    .collect();
+
+                // Add column references for async expression dependencies
+                for col_name in &async_cols {
+                    if scan_output_schema.index_of(col_name).is_ok() {
+                        let col_expr =
+                            
Expr::Column(Column::new_unqualified(col_name.clone()));
+                        // Only add if not already in pushdown_exprs
+                        if !pushdown_exprs.iter().any(|e| e == &col_expr) {
+                            pushdown_exprs.push(col_expr);
+                        }
+                    }
+                }
+
+                // Create and try to push the non-async projection
+                let physical_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = 
pushdown_exprs
+                    .iter()
+                    .map(|e| {
+                        let physical =
+                            self.create_physical_expr(e, &scan_df_schema, 
session_state)?;
+                        let name = e.schema_name().to_string();
+                        Ok((physical, name))
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
+                let proj_exprs: Vec<ProjectionExpr> = physical_exprs
+                    .into_iter()
+                    .map(|(expr, alias)| ProjectionExpr { expr, alias })
+                    .collect();
+                let projection_exec =
+                    ProjectionExec::try_new(proj_exprs, Arc::clone(&plan))?;
+
+                match plan.try_swapping_with_projection(&projection_exec)? {
+                    Some(optimized_plan) => {
+                        plan = optimized_plan;
+                    }
+                    None => {
+                        plan = Arc::new(projection_exec);
+                    }
+                }
+
+                // Now apply the full projection (including async) on top
+                // The schema has changed, so we need a new DFSchema
+                let new_schema = plan.schema();
+                let new_df_schema = 
DFSchema::try_from(new_schema.as_ref().clone())?;
+
+                // The final projection needs to produce the original output
+                plan = self.create_projection_exec(
+                    &unnormalized_proj_exprs,
+                    plan,
+                    &new_df_schema,
+                    session_state,
+                )?;
+            }
+        }
+
+        Ok(plan)
+    }
+
+    /// Compute the column indices needed for the scan based on projection 
expressions.
+    ///
+    /// Returns a tuple of:
+    /// - `Option<Vec<Expr>>`: Remainder projection to apply on top of the 
scan output.
+    ///   `None` if the projection is all simple column references 
(reordering, dropping, etc.)
+    /// - `Vec<usize>`: Column indices to scan from the source.
+    fn compute_scan_projection(
+        &self,
+        projection: &Option<Vec<Expr>>,
+        source_schema: &Schema,
+    ) -> Result<(Option<Vec<Expr>>, Vec<usize>)> {
+        let Some(exprs) = projection else {
+            // None means scan all columns, no remainder needed
+            return Ok((None, (0..source_schema.fields().len()).collect()));
+        };
+
+        if exprs.is_empty() {
+            return Ok((None, vec![]));
+        }
+
+        let mut has_complex_expr = false;
+        let mut all_required_columns = BTreeSet::new();
+        let mut remainder_exprs = vec![];
+
+        for expr in exprs {
+            // Collect all column references from this expression
+            let mut is_complex_expr = false;
+            expr.apply(|e| {
+                if let Expr::Column(col) = e {
+                    if let Ok(index) = source_schema.index_of(col.name()) {
+                        // If we made it this far this must be the first level 
and the whole expression is a simple column reference
+                        // But we don't know if subsequent expressions might 
have more complex expressions necessitating `remainder_exprs`
+                        // to be populated, so we push to `remainder_exprs` 
just in case they are needed later.
+                        // It is simpler to do this now than to try to 
backtrack later since we already matched into Expr::Column
+                        // and thus can simply clone `expr` here.
+                        // If `is_complex_expr` is true then we will append 
the complex expression itself to `remainder_exprs` instead
+                        // later once we've fully traversed this expression.
+                        if !is_complex_expr {
+                            remainder_exprs.push(expr.clone());
+                        }
+                        all_required_columns.insert(index);
+                    }
+                } else {
+                    // Nothing to do here except note that we will have to 
append the full expression later
+                    is_complex_expr = true;
+                }
+                Ok(TreeNodeRecursion::Continue)
+            })?;
+            if is_complex_expr {
+                // If any expression in the projection is not a simple column 
reference we will need to apply a remainder projection
+                has_complex_expr = true;
+                // Append the full expression itself to the remainder 
expressions
+                // So given a projection like `[a, a + c, d]` we would have:
+                // all_required_columns = {0, 2, 3}
+                // original schema: [a: Int, b: Int, c: Int, d: Int]
+                // projected schema: [a: Int, c: Int, d: Int]
+                // remainder_exprs = [col(a), col(a) + col(c), col(d)]
+                remainder_exprs.push(expr.clone());
+            }
+        }
+
+        Ok((
+            has_complex_expr.then_some(remainder_exprs),
+            all_required_columns.into_iter().collect(),
+        ))
+    }
+
+    /// Classifies projection expressions into async and non-async.
+    ///
+    /// Returns `(async_expr_indices, non_async_expr_indices)` where:
+    /// - `async_expr_indices`: indices of expressions that contain async UDFs
+    /// - `non_async_expr_indices`: indices of expressions that are purely 
synchronous
+    fn classify_projection_exprs(
+        &self,
+        exprs: &[Expr],
+        input_dfschema: &DFSchema,
+        session_state: &SessionState,
+    ) -> Result<(Vec<usize>, Vec<usize>)> {
+        let mut async_indices = vec![];
+        let mut non_async_indices = vec![];
+        let schema = input_dfschema.as_arrow();
+
+        for (i, expr) in exprs.iter().enumerate() {
+            let physical =
+                self.create_physical_expr(expr, input_dfschema, 
session_state)?;
+
+            // Use AsyncMapper to check if expression has async UDFs
+            let mut async_map = AsyncMapper::new(schema.fields().len());
+            async_map.find_references(&physical, schema)?;
+
+            if async_map.is_empty() {
+                non_async_indices.push(i);
+            } else {
+                async_indices.push(i);
+            }
+        }
+
+        Ok((async_indices, non_async_indices))
+    }
+
+    /// Creates a ProjectionExec from logical expressions, handling async UDF 
expressions.
+    ///
+    /// If the expressions contain async UDFs, wraps them with `AsyncFuncExec`.
+    fn create_projection_exec(
+        &self,
+        exprs: &[Expr],
+        input: Arc<dyn ExecutionPlan>,
+        input_dfschema: &DFSchema,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let physical_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = exprs
+            .iter()
+            .map(|e| {
+                let physical =
+                    self.create_physical_expr(e, input_dfschema, 
session_state)?;
+                let name = e.schema_name().to_string();
+                Ok((physical, name))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        let num_input_columns = input.schema().fields().len();
+        let input_schema = input.schema();
+
+        match self.try_plan_async_exprs(

Review Comment:
   Need to investigate how `try_plan_async_exprs` and how it overlaps with the 
projection splitting above.



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -2807,14 +2829,129 @@ impl TableScan {
         Ok(Self {

Review Comment:
   Wonder if we could ruse TableScanBuilder here?



##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1725,6 +1708,324 @@ impl DefaultPhysicalPlanner {
             ))
         }
     }
+
+    /// Plan a TableScan node, wrapping with ProjectionExec as needed.
+    ///
+    /// This method handles projection pushdown by:
+    /// 1. Computing which columns the scan needs to produce
+    /// 2. Creating the scan with minimal required columns
+    /// 3. Applying any remainder projection (for complex expressions)
+    /// 4. Attempting to push non-async expressions into the scan via 
`try_swapping_with_projection`
+    async fn plan_table_scan(
+        &self,
+        scan: &TableScan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let provider = source_as_provider(&scan.source)?;
+        let source_schema = scan.source.schema();
+
+        // Remove qualifiers from filters
+        let filters: Vec<Expr> = 
unnormalize_cols(scan.filters.iter().cloned());
+
+        // Compute required column indices and remainder projection
+        let (remainder_projection, scan_indices) =
+            self.compute_scan_projection(&scan.projection, &source_schema)?;
+
+        // Create the scan
+        let scan_args = ScanArgs::default()
+            .with_projection(Some(&scan_indices))
+            .with_filters(if filters.is_empty() {
+                None
+            } else {
+                Some(&filters)
+            })
+            .with_limit(scan.fetch);
+
+        let scan_result = provider.scan_with_args(session_state, 
scan_args).await?;
+        let mut plan: Arc<dyn ExecutionPlan> = Arc::clone(scan_result.plan());
+
+        // Wrap with ProjectionExec if remainder projection is needed
+        if let Some(ref proj_exprs) = remainder_projection {
+            let scan_output_schema = plan.schema();
+            let scan_df_schema = 
DFSchema::try_from(scan_output_schema.as_ref().clone())?;
+            let unnormalized_proj_exprs: Vec<Expr> =
+                unnormalize_cols(proj_exprs.iter().cloned());
+
+            // Classify expressions as async or non-async
+            let (async_indices, non_async_indices) = 
self.classify_projection_exprs(
+                &unnormalized_proj_exprs,
+                &scan_df_schema,
+                session_state,
+            )?;
+
+            if async_indices.is_empty() {
+                // All expressions are non-async - try to push the entire 
projection
+                let physical_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
+                    unnormalized_proj_exprs
+                        .iter()
+                        .map(|e| {
+                            let physical = self.create_physical_expr(
+                                e,
+                                &scan_df_schema,
+                                session_state,
+                            )?;
+                            let name = e.schema_name().to_string();
+                            Ok((physical, name))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
+
+                let proj_exprs: Vec<ProjectionExpr> = physical_exprs
+                    .into_iter()
+                    .map(|(expr, alias)| ProjectionExpr { expr, alias })
+                    .collect();
+                let projection_exec =
+                    ProjectionExec::try_new(proj_exprs, Arc::clone(&plan))?;
+
+                match plan.try_swapping_with_projection(&projection_exec)? {

Review Comment:
   Might move this out of this PR. The idea is that once we have projections in 
`TableScan` then the projection pushdown optimizer can handle pushing down 
`get_field` expressions all the way into `TableScan` (past any filters) and 
then we can here push those into the scan itself (if the scan accepts them).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to