adriangb commented on code in PR #19538:
URL: https://github.com/apache/datafusion/pull/19538#discussion_r2721556860
##########
datafusion/physical-optimizer/src/projection_pushdown.rs:
##########
@@ -88,6 +98,267 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
}
}
+/// Tries to split a projection to extract beneficial sub-expressions for
pushdown.
+///
+/// This function walks each expression in the projection and extracts
beneficial
+/// sub-expressions (like `get_field`) from within larger non-beneficial
expressions.
+/// For example:
+/// - Input: `get_field(col, 'foo') + 1`
+/// - Output: Inner projection: `get_field(col, 'foo') AS __extracted_0`,
Outer: `__extracted_0 + 1`
+///
+/// This enables the beneficial parts to be pushed down while keeping
non-beneficial
+/// expressions (like literals and computations) above.
+fn try_split_projection(
Review Comment:
Yes I would love to do that... but this is already quite complex on it's
own. Given that these are internal refactors, maybe we can implement
independently and then in a followup try to unite the efforts?
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1123,15 +1123,8 @@ impl ExecutionPlan for RepartitionExec {
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
- // If the projection does not narrow the schema, we should not try to
push it down.
- if projection.expr().len() >=
projection.input().schema().fields().len() {
- return Ok(None);
- }
-
// If pushdown is not beneficial or applicable, break it.
- if projection.benefits_from_input_partitioning()[0]
- || !all_columns(projection.expr())
- {
+ if projection.benefits_from_input_partitioning()[0] {
Review Comment:
Alas I tried to unify with the other checks and it produces a lot of slt
changes. I reviewed some of them and I think the new plans were better, but
even if we want to make that change I'd suggest we do it in a followup so we
can review that independently. For this PR I'd like to keep the SLT changes to
a minimum.
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1391,8 +1391,10 @@ impl ExecutionPlan for SortExec {
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
- // If the projection does not narrow the schema, we should not try to
push it down.
- if projection.expr().len() >=
projection.input().schema().fields().len() {
+ // Only push projections that are trivial (column refs, field
accessors) or
Review Comment:
Yeah I think there's a lot we could do. Maybe even take into account
statistics (e.g. column sizes)? And now we have a centralized place to do so.
But I think that's all out of scope for this PR
##########
datafusion/physical-optimizer/src/projection_pushdown.rs:
##########
@@ -88,6 +98,267 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
}
}
+/// Tries to split a projection to extract beneficial sub-expressions for
pushdown.
+///
+/// This function walks each expression in the projection and extracts
beneficial
+/// sub-expressions (like `get_field`) from within larger non-beneficial
expressions.
+/// For example:
+/// - Input: `get_field(col, 'foo') + 1`
+/// - Output: Inner projection: `get_field(col, 'foo') AS __extracted_0`,
Outer: `__extracted_0 + 1`
+///
+/// This enables the beneficial parts to be pushed down while keeping
non-beneficial
+/// expressions (like literals and computations) above.
+fn try_split_projection(
+ plan: Arc<dyn ExecutionPlan>,
+ alias_generator: &AliasGenerator,
+) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
+ let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() else
{
+ return Ok(Transformed::no(plan));
+ };
+
+ let input_schema = projection.input().schema();
+ let mut extractor = TrivialExprExtractor::new(input_schema.as_ref(),
alias_generator);
+
+ // Extract trivial sub-expressions from each projection expression
+ let mut outer_exprs = Vec::new();
+ let mut has_extractions = false;
+
+ for proj_expr in projection.expr() {
+ // If this is already an expression from an extraction don't try to
re-extract it (would cause infinite recursion)
+ if proj_expr.alias.starts_with("__extracted") {
+ outer_exprs.push(proj_expr.clone());
+ continue;
+ }
+
+ // Only extract from non-trivial expressions. If the entire expression
is
+ // already TrivialExpr (like `get_field(col, 'foo')`), it can be
pushed as-is.
+ // We only need to split when there's a non-trivial expression with
trivial
+ // sub-expressions (like `get_field(col, 'foo') + 1`).
+ if matches!(proj_expr.expr.triviality(), ArgTriviality::TrivialExpr) {
+ outer_exprs.push(proj_expr.clone());
+ continue;
+ }
+
+ let rewritten = extractor.extract(Arc::clone(&proj_expr.expr))?;
+ if !Arc::ptr_eq(&rewritten, &proj_expr.expr) {
+ has_extractions = true;
+ }
+ outer_exprs.push(ProjectionExpr::new(rewritten,
proj_expr.alias.clone()));
+ }
+
+ if !has_extractions {
+ return Ok(Transformed::no(plan));
+ }
+
+ // Collect columns needed by outer expressions that aren't extracted
+ extractor.collect_columns_needed(&outer_exprs)?;
+
+ // Build inner projection from extracted expressions + needed columns
+ let inner_exprs = extractor.build_inner_projection()?;
+
+ if inner_exprs.is_empty() {
+ return Ok(Transformed::no(plan));
+ }
+
+ // Create the inner projection (to be pushed down)
+ let inner = ProjectionExec::try_new(inner_exprs,
Arc::clone(projection.input()))?;
+
+ // Rewrite outer expressions to reference the inner projection's output
schema
+ let inner_schema = inner.schema();
+ let final_outer_exprs = extractor.finalize_outer_exprs(outer_exprs,
&inner_schema)?;
+
+ // Create the outer projection (stays above)
+ let outer = ProjectionExec::try_new(final_outer_exprs, Arc::new(inner))?;
+
+ Ok(Transformed::yes(Arc::new(outer)))
+}
+
+/// Extracts beneficial trivial sub-expressions from larger expressions.
+///
+/// Similar to `JoinFilterRewriter`, this struct walks expression trees
top-down
+/// and extracts sub-expressions where `triviality() ==
ArgTriviality::TrivialExpr`
+/// (beneficial trivial expressions like field accessors).
+///
+/// The extracted expressions are replaced with column references pointing to
+/// an inner projection that computes these sub-expressions.
+struct TrivialExprExtractor<'a> {
+ /// Extracted trivial expressions: maps expression -> alias
+ extracted: IndexMap<Arc<dyn PhysicalExpr>, String>,
+ /// Columns needed by outer expressions: maps input column index -> alias
+ columns_needed: IndexMap<usize, String>,
+ /// Input schema for the projection
+ input_schema: &'a Schema,
+ /// Alias generator for unique names
+ alias_generator: &'a AliasGenerator,
+}
+
+impl<'a> TrivialExprExtractor<'a> {
+ fn new(input_schema: &'a Schema, alias_generator: &'a AliasGenerator) ->
Self {
+ Self {
+ extracted: IndexMap::new(),
+ columns_needed: IndexMap::new(),
+ input_schema,
+ alias_generator,
+ }
+ }
+
+ /// Extracts beneficial trivial sub-expressions from the given expression.
+ ///
+ /// Walks the expression tree top-down and replaces beneficial trivial
+ /// sub-expressions with column references to the inner projection.
+ fn extract(&mut self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn
PhysicalExpr>> {
Review Comment:
I believe in this case it was because I had trouble avoiding infinite
recursions. The tree node API doesn't allow "skip to this node" or "skip over
this node" just Continue, Stop or Jump (which jumps to a sibling and skips the
entire subtree). I think in this case I needed more complex semantics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]