adriangb commented on code in PR #20091:
URL: https://github.com/apache/datafusion/pull/20091#discussion_r2750153569


##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1725,6 +1708,324 @@ impl DefaultPhysicalPlanner {
             ))
         }
     }
+
+    /// Plan a TableScan node, wrapping with ProjectionExec as needed.
+    ///
+    /// This method handles projection pushdown by:
+    /// 1. Computing which columns the scan needs to produce
+    /// 2. Creating the scan with minimal required columns
+    /// 3. Applying any remainder projection (for complex expressions)
+    /// 4. Attempting to push non-async expressions into the scan via 
`try_swapping_with_projection`
+    async fn plan_table_scan(
+        &self,
+        scan: &TableScan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let provider = source_as_provider(&scan.source)?;
+        let source_schema = scan.source.schema();
+
+        // Remove qualifiers from filters
+        let filters: Vec<Expr> = 
unnormalize_cols(scan.filters.iter().cloned());
+
+        // Compute required column indices and remainder projection
+        let (remainder_projection, scan_indices) =
+            self.compute_scan_projection(&scan.projection, &source_schema)?;
+
+        // Create the scan
+        let scan_args = ScanArgs::default()
+            .with_projection(Some(&scan_indices))
+            .with_filters(if filters.is_empty() {
+                None
+            } else {
+                Some(&filters)
+            })
+            .with_limit(scan.fetch);
+
+        let scan_result = provider.scan_with_args(session_state, 
scan_args).await?;
+        let mut plan: Arc<dyn ExecutionPlan> = Arc::clone(scan_result.plan());
+
+        // Wrap with ProjectionExec if remainder projection is needed
+        if let Some(ref proj_exprs) = remainder_projection {
+            let scan_output_schema = plan.schema();
+            let scan_df_schema = 
DFSchema::try_from(scan_output_schema.as_ref().clone())?;
+            let unnormalized_proj_exprs: Vec<Expr> =
+                unnormalize_cols(proj_exprs.iter().cloned());
+
+            // Classify expressions as async or non-async
+            let (async_indices, non_async_indices) = 
self.classify_projection_exprs(
+                &unnormalized_proj_exprs,
+                &scan_df_schema,
+                session_state,
+            )?;
+
+            if async_indices.is_empty() {
+                // All expressions are non-async - try to push the entire 
projection
+                let physical_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
+                    unnormalized_proj_exprs
+                        .iter()
+                        .map(|e| {
+                            let physical = self.create_physical_expr(
+                                e,
+                                &scan_df_schema,
+                                session_state,
+                            )?;
+                            let name = e.schema_name().to_string();
+                            Ok((physical, name))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
+
+                let proj_exprs: Vec<ProjectionExpr> = physical_exprs
+                    .into_iter()
+                    .map(|(expr, alias)| ProjectionExpr { expr, alias })
+                    .collect();
+                let projection_exec =
+                    ProjectionExec::try_new(proj_exprs, Arc::clone(&plan))?;
+
+                match plan.try_swapping_with_projection(&projection_exec)? {
+                    Some(optimized_plan) => {
+                        plan = optimized_plan;
+                    }
+                    None => {
+                        plan = Arc::new(projection_exec);
+                    }
+                }
+            } else if non_async_indices.is_empty() {
+                // All expressions are async - just create ProjectionExec with 
AsyncFuncExec
+                plan = self.create_projection_exec(
+                    &unnormalized_proj_exprs,
+                    plan,
+                    &scan_df_schema,
+                    session_state,
+                )?;
+            } else {
+                // Mixed: push non-async expressions + columns needed by 
async, keep async on top

Review Comment:
   Need to make sure we have test coverage that hits this. Generally we should 
run a full coverage check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to