adriangb commented on code in PR #19538:
URL: https://github.com/apache/datafusion/pull/19538#discussion_r2721556860


##########
datafusion/physical-optimizer/src/projection_pushdown.rs:
##########
@@ -88,6 +98,267 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
     }
 }
 
+/// Tries to split a projection to extract beneficial sub-expressions for 
pushdown.
+///
+/// This function walks each expression in the projection and extracts 
beneficial
+/// sub-expressions (like `get_field`) from within larger non-beneficial 
expressions.
+/// For example:
+/// - Input: `get_field(col, 'foo') + 1`
+/// - Output: Inner projection: `get_field(col, 'foo') AS __extracted_0`, 
Outer: `__extracted_0 + 1`
+///
+/// This enables the beneficial parts to be pushed down while keeping 
non-beneficial
+/// expressions (like literals and computations) above.
+fn try_split_projection(

Review Comment:
   Yes I would love to do that... but this is already quite complex on it's 
own. Given that these are internal refactors, maybe we can implement 
independently and then in a followup try to unite the efforts?



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1123,15 +1123,8 @@ impl ExecutionPlan for RepartitionExec {
         &self,
         projection: &ProjectionExec,
     ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
-        // If the projection does not narrow the schema, we should not try to 
push it down.
-        if projection.expr().len() >= 
projection.input().schema().fields().len() {
-            return Ok(None);
-        }
-
         // If pushdown is not beneficial or applicable, break it.
-        if projection.benefits_from_input_partitioning()[0]
-            || !all_columns(projection.expr())
-        {
+        if projection.benefits_from_input_partitioning()[0] {

Review Comment:
   Alas I tried to unify with the other checks and it produces a lot of slt 
changes. I reviewed some of them and I think the new plans were better, but 
even if we want to make that change I'd suggest we do it in a followup so we 
can review that independently. For this PR I'd like to keep the SLT changes to 
a minimum.



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1391,8 +1391,10 @@ impl ExecutionPlan for SortExec {
         &self,
         projection: &ProjectionExec,
     ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
-        // If the projection does not narrow the schema, we should not try to 
push it down.
-        if projection.expr().len() >= 
projection.input().schema().fields().len() {
+        // Only push projections that are trivial (column refs, field 
accessors) or

Review Comment:
   Yeah I think there's a lot we could do. Maybe even take into account 
statistics (e.g. column sizes)? And now we have a centralized place to do so. 
But I think that's all out of scope for this PR



##########
datafusion/physical-optimizer/src/projection_pushdown.rs:
##########
@@ -88,6 +98,267 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
     }
 }
 
+/// Tries to split a projection to extract beneficial sub-expressions for 
pushdown.
+///
+/// This function walks each expression in the projection and extracts 
beneficial
+/// sub-expressions (like `get_field`) from within larger non-beneficial 
expressions.
+/// For example:
+/// - Input: `get_field(col, 'foo') + 1`
+/// - Output: Inner projection: `get_field(col, 'foo') AS __extracted_0`, 
Outer: `__extracted_0 + 1`
+///
+/// This enables the beneficial parts to be pushed down while keeping 
non-beneficial
+/// expressions (like literals and computations) above.
+fn try_split_projection(
+    plan: Arc<dyn ExecutionPlan>,
+    alias_generator: &AliasGenerator,
+) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
+    let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() else 
{
+        return Ok(Transformed::no(plan));
+    };
+
+    let input_schema = projection.input().schema();
+    let mut extractor = TrivialExprExtractor::new(input_schema.as_ref(), 
alias_generator);
+
+    // Extract trivial sub-expressions from each projection expression
+    let mut outer_exprs = Vec::new();
+    let mut has_extractions = false;
+
+    for proj_expr in projection.expr() {
+        // If this is already an expression from an extraction don't try to 
re-extract it (would cause infinite recursion)
+        if proj_expr.alias.starts_with("__extracted") {
+            outer_exprs.push(proj_expr.clone());
+            continue;
+        }
+
+        // Only extract from non-trivial expressions. If the entire expression 
is
+        // already TrivialExpr (like `get_field(col, 'foo')`), it can be 
pushed as-is.
+        // We only need to split when there's a non-trivial expression with 
trivial
+        // sub-expressions (like `get_field(col, 'foo') + 1`).
+        if matches!(proj_expr.expr.triviality(), ArgTriviality::TrivialExpr) {
+            outer_exprs.push(proj_expr.clone());
+            continue;
+        }
+
+        let rewritten = extractor.extract(Arc::clone(&proj_expr.expr))?;
+        if !Arc::ptr_eq(&rewritten, &proj_expr.expr) {
+            has_extractions = true;
+        }
+        outer_exprs.push(ProjectionExpr::new(rewritten, 
proj_expr.alias.clone()));
+    }
+
+    if !has_extractions {
+        return Ok(Transformed::no(plan));
+    }
+
+    // Collect columns needed by outer expressions that aren't extracted
+    extractor.collect_columns_needed(&outer_exprs)?;
+
+    // Build inner projection from extracted expressions + needed columns
+    let inner_exprs = extractor.build_inner_projection()?;
+
+    if inner_exprs.is_empty() {
+        return Ok(Transformed::no(plan));
+    }
+
+    // Create the inner projection (to be pushed down)
+    let inner = ProjectionExec::try_new(inner_exprs, 
Arc::clone(projection.input()))?;
+
+    // Rewrite outer expressions to reference the inner projection's output 
schema
+    let inner_schema = inner.schema();
+    let final_outer_exprs = extractor.finalize_outer_exprs(outer_exprs, 
&inner_schema)?;
+
+    // Create the outer projection (stays above)
+    let outer = ProjectionExec::try_new(final_outer_exprs, Arc::new(inner))?;
+
+    Ok(Transformed::yes(Arc::new(outer)))
+}
+
+/// Extracts beneficial trivial sub-expressions from larger expressions.
+///
+/// Similar to `JoinFilterRewriter`, this struct walks expression trees 
top-down
+/// and extracts sub-expressions where `triviality() == 
ArgTriviality::TrivialExpr`
+/// (beneficial trivial expressions like field accessors).
+///
+/// The extracted expressions are replaced with column references pointing to
+/// an inner projection that computes these sub-expressions.
+struct TrivialExprExtractor<'a> {
+    /// Extracted trivial expressions: maps expression -> alias
+    extracted: IndexMap<Arc<dyn PhysicalExpr>, String>,
+    /// Columns needed by outer expressions: maps input column index -> alias
+    columns_needed: IndexMap<usize, String>,
+    /// Input schema for the projection
+    input_schema: &'a Schema,
+    /// Alias generator for unique names
+    alias_generator: &'a AliasGenerator,
+}
+
+impl<'a> TrivialExprExtractor<'a> {
+    fn new(input_schema: &'a Schema, alias_generator: &'a AliasGenerator) -> 
Self {
+        Self {
+            extracted: IndexMap::new(),
+            columns_needed: IndexMap::new(),
+            input_schema,
+            alias_generator,
+        }
+    }
+
+    /// Extracts beneficial trivial sub-expressions from the given expression.
+    ///
+    /// Walks the expression tree top-down and replaces beneficial trivial
+    /// sub-expressions with column references to the inner projection.
+    fn extract(&mut self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn 
PhysicalExpr>> {

Review Comment:
   I believe in this case it was because I had trouble avoiding infinite 
recursions. The tree node API doesn't allow "skip to this node" or "skip over 
this node" just Continue, Stop or Jump (which jumps to a sibling and skips the 
entire subtree). I think in this case I needed more complex semantics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to