This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-22534-2fc3b1dff95d729cdf6e912833981c87d5a30b03
in repository https://gitbox.apache.org/repos/asf/datafusion.git

commit bd33e6f66d13f40a7940aa1790673b7028791953
Author: Neil Conway <[email protected]>
AuthorDate: Thu May 28 16:37:21 2026 -0400

    perf: Handle intermediate `Projection` nodes in `EliminateOuterJoin` 
(#22534)
    
    ## Which issue does this PR close?
    
    - Closes #22531.
    
    ## Rationale for this change
    
    `EliminateOuterJoin` looks for plans with a `Filter` directly above a
    `Join`. For most queries, that is the right plan shape, because
    `PushdownFilter` will typically place the filters that are useful for
    outer join elimination directly on top of the relevant `Join`. However,
    some plans don't follow this shape, for at least two reasons:
    
    1. Volatile expressions can interfere with filter pushdown
    2. `OptimizeProjections` might insert a `Projection` between the
    `Filter` and `Join`
    
    Notably, we run into case (2) in TPC-DS Q49; we currently fail to
    convert three outer joins to inner joins for that reason.
    
    We can handle this by teaching `EliminateOuterJoins` to descend through
    one or more intermediate `Projection` nodes, rewriting the filter
    predicate as it goes to account for the effect of the projection.
    
    ## What changes are included in this PR?
    
    * Teach `EliminateOuterJoins` to descend through one or more
    `Projection` nodes
    * Refactor various code in `eliminate_outer_joins.rs`, improve comments
    * Add unit tests
    * Add SLT tests
    
    ## Are these changes tested?
    
    Yes, new tests added. Manually verified that we fail to eliminate the
    outer joins in TPC-DS Q49 without this change and succeed on doing so
    with this change.
    
    ## Are there any user-facing changes?
    
    More effective outer join query optimization.
---
 datafusion/optimizer/src/eliminate_outer_join.rs   | 388 ++++++++++++++++-----
 .../test_files/eliminate_outer_join.slt            | 107 ++++++
 2 files changed, 405 insertions(+), 90 deletions(-)

diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs 
b/datafusion/optimizer/src/eliminate_outer_join.rs
index 748b04d5cf..4691eaf48b 100644
--- a/datafusion/optimizer/src/eliminate_outer_join.rs
+++ b/datafusion/optimizer/src/eliminate_outer_join.rs
@@ -15,39 +15,66 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! [`EliminateOuterJoin`] converts `LEFT/RIGHT/FULL` joins to `INNER` joins
+//! [`EliminateOuterJoin`] rewrites outer joins to simpler join types when
+//! filters make the outer rows unnecessary (e.g. `LEFT`/`RIGHT` to `INNER`,
+//! and `FULL` to `LEFT`/`RIGHT`/`INNER`).
+use crate::push_down_filter::replace_cols_by_name;
 use crate::{OptimizerConfig, OptimizerRule};
-use datafusion_common::{Column, DFSchema, Result};
-use datafusion_expr::logical_plan::{Join, JoinType, LogicalPlan};
+use datafusion_common::{Column, DFSchema, Result, qualified_name};
+use datafusion_expr::logical_plan::{Join, JoinType, LogicalPlan, Projection};
 use datafusion_expr::{Expr, Filter, Operator};
 
 use crate::optimizer::ApplyOrder;
 use datafusion_common::tree_node::Transformed;
 use datafusion_expr::expr::{BinaryExpr, Cast, InList, Like, TryCast};
+use std::collections::HashMap;
 use std::sync::Arc;
 
+/// Attempt to simplify outer joins when filters make their null-padded
+/// rows impossible to observe.
 ///
-/// Attempt to replace outer joins with inner joins.
+/// Outer joins are generally more expensive than inner joins and can block
+/// predicate pushdown and other optimizations. When a filter above an outer
+/// join removes every row the join would add for unmatched input rows, the
+/// join can be changed to a cheaper join type.
 ///
-/// Outer joins are typically more expensive to compute at runtime
-/// than inner joins and prevent various forms of predicate pushdown
-/// and other optimizations, so removing them if possible is beneficial.
+/// For example:
 ///
-/// Inner joins filter out rows that do match. Outer joins pass rows
-/// that do not match padded with nulls. If there is a filter in the
-/// query that would filter any such null rows after the join the rows
-/// introduced by the outer join are filtered.
+/// ```sql
+/// SELECT ...
+/// FROM a LEFT JOIN b ON ...
+/// WHERE b.xx = 100
+/// ```
 ///
-/// For example, in the `select ... from a left join b on ... where b.xx = 
100;`
+/// For unmatched rows from `a`, the LEFT JOIN would produce a row with
+/// `b.xx` set to NULL. The predicate `b.xx = 100` does not pass for those
+/// rows, so the query does not need the LEFT JOIN's null-padded output and
+/// the join can be rewritten as an inner join.
 ///
-/// For rows when `b.xx` is null (as it would be after an outer join),
-/// the `b.xx = 100` predicate filters them out and there is no
-/// need to produce null rows for output.
+/// The same reasoning can also simplify FULL joins to LEFT, RIGHT, or INNER
+/// joins when filters remove the rows padded on one or both sides.
 ///
-/// Generally, an outer join can be rewritten to inner join if the
-/// filters from the WHERE clause return false while any inputs are
-/// null and columns of those quals are come from nullable side of
-/// outer join.
+/// This rule looks for a filter above an outer join:
+///
+/// ```text
+/// Filter(predicate)
+///   Join(LEFT/RIGHT/FULL)
+/// ```
+///
+/// It also handles plan shapes where projection pruning has inserted one or
+/// more Projection nodes between the filter and join:
+///
+/// ```text
+/// Filter(predicate over projection output)
+///   Projection(...)
+///     ...
+///       Join(LEFT/RIGHT/FULL)
+/// ```
+///
+/// In the projection case, the rule rewrites a copy of the predicate through
+/// each Projection so it can analyze the predicate against the Join inputs.
+/// The original filter predicate and Projection nodes are preserved when the
+/// plan is rebuilt.
 #[derive(Default, Debug)]
 pub struct EliminateOuterJoin;
 
@@ -77,61 +104,137 @@ impl OptimizerRule for EliminateOuterJoin {
         plan: LogicalPlan,
         _config: &dyn OptimizerConfig,
     ) -> Result<Transformed<LogicalPlan>> {
-        match plan {
-            LogicalPlan::Filter(mut filter) => match 
Arc::unwrap_or_clone(filter.input) {
+        let LogicalPlan::Filter(filter) = plan else {
+            return Ok(Transformed::no(plan));
+        };
+
+        // Descend through one or more Projection nodes until we find a Join.
+        // For each Projection we encounter, rewrite a working copy of the
+        // predicate by replacing references to projection output columns with
+        // the expressions that define them. Keep the filter's original
+        // predicate intact for eventual use in the rebuilt plan; the rewritten
+        // predicate is used only for the null-rejection analysis.
+        let mut rewritten_predicate = filter.predicate.clone();
+        let mut projections: Vec<Projection> = Vec::new();
+        let mut cur = Arc::clone(&filter.input);
+
+        let new_join = loop {
+            match cur.as_ref() {
+                LogicalPlan::Projection(p) => {
+                    rewritten_predicate =
+                        inline_through_projection(rewritten_predicate, p)?;
+                    let next = Arc::clone(&p.input);
+                    projections.push(p.clone());
+                    cur = next;
+                }
                 LogicalPlan::Join(join) => {
-                    let mut null_rejecting_cols: Vec<Column> = vec![];
-
-                    extract_null_rejecting_columns(
-                        &filter.predicate,
-                        &mut null_rejecting_cols,
-                        join.left.schema(),
-                        join.right.schema(),
-                        true,
-                    );
-
-                    let new_join_type = if join.join_type.is_outer() {
-                        let mut left_non_nullable = false;
-                        let mut right_non_nullable = false;
-                        for col in null_rejecting_cols.iter() {
-                            if join.left.schema().has_column(col) {
-                                left_non_nullable = true;
-                            }
-                            if join.right.schema().has_column(col) {
-                                right_non_nullable = true;
-                            }
-                        }
-                        eliminate_outer(
-                            join.join_type,
-                            left_non_nullable,
-                            right_non_nullable,
-                        )
-                    } else {
-                        join.join_type
+                    let Some(new_join) = try_simplify_join(join, 
&rewritten_predicate)
+                    else {
+                        return 
Ok(Transformed::no(LogicalPlan::Filter(filter)));
                     };
-
-                    let new_join = Arc::new(LogicalPlan::Join(Join {
-                        left: join.left,
-                        right: join.right,
-                        join_type: new_join_type,
-                        join_constraint: join.join_constraint,
-                        on: join.on.clone(),
-                        filter: join.filter.clone(),
-                        schema: Arc::clone(&join.schema),
-                        null_equality: join.null_equality,
-                        null_aware: join.null_aware,
-                    }));
-                    Filter::try_new(filter.predicate, new_join)
-                        .map(|f| Transformed::yes(LogicalPlan::Filter(f)))
+                    break new_join;
                 }
-                filter_input => {
-                    filter.input = Arc::new(filter_input);
-                    Ok(Transformed::no(LogicalPlan::Filter(filter)))
+                _ => {
+                    return Ok(Transformed::no(LogicalPlan::Filter(filter)));
                 }
-            },
-            _ => Ok(Transformed::no(plan)),
+            }
+        };
+
+        let rebuilt_inner = rewrap_projections(new_join, projections);
+        Filter::try_new(filter.predicate, Arc::new(rebuilt_inner))
+            .map(|f| Transformed::yes(LogicalPlan::Filter(f)))
+    }
+}
+
+/// Run the null-rejection analysis on `predicate` against `join`'s left/right
+/// schemas. Return `Some(new_join_plan)` if the join type can be tightened
+/// (e.g. LEFT → INNER), `None` otherwise.
+fn try_simplify_join(join: &Join, predicate: &Expr) -> Option<LogicalPlan> {
+    if !join.join_type.is_outer() {
+        return None;
+    }
+
+    let mut null_rejecting_cols: Vec<Column> = vec![];
+    extract_null_rejecting_columns(
+        predicate,
+        &mut null_rejecting_cols,
+        join.left.schema(),
+        join.right.schema(),
+        true,
+    );
+
+    let mut left_non_nullable = false;
+    let mut right_non_nullable = false;
+    for col in null_rejecting_cols.iter() {
+        if join.left.schema().has_column(col) {
+            left_non_nullable = true;
+        }
+        if join.right.schema().has_column(col) {
+            right_non_nullable = true;
         }
     }
+
+    let new_join_type =
+        eliminate_outer(join.join_type, left_non_nullable, right_non_nullable);
+    if new_join_type == join.join_type {
+        return None;
+    }
+
+    Some(LogicalPlan::Join(Join {
+        left: Arc::clone(&join.left),
+        right: Arc::clone(&join.right),
+        join_type: new_join_type,
+        join_constraint: join.join_constraint,
+        on: join.on.clone(),
+        filter: join.filter.clone(),
+        schema: Arc::clone(&join.schema),
+        null_equality: join.null_equality,
+        null_aware: join.null_aware,
+    }))
+}
+
+/// Substitute the projection's output column references in `predicate` with
+/// the projection's defining expressions (stripped of any `Alias` wrapper).
+/// The result expresses `predicate` over the projection's *input* schema.
+///
+/// Unlike `PushDownFilter`, this rule does not change expression evaluation
+/// behavior (in fact, the rewritten expressions are only used for analysis
+/// purposes). Therefore, function volatility and `MoveTowardsLeafNodes`
+/// placement can be ignored here.
+fn inline_through_projection(predicate: Expr, p: &Projection) -> Result<Expr> {
+    let mut map: HashMap<String, Expr> = HashMap::new();
+    for ((qualifier, field), expr) in p.schema.iter().zip(p.expr.iter()) {
+        map.insert(
+            qualified_name(qualifier, field.name()),
+            unalias(expr).clone(),
+        );
+    }
+    replace_cols_by_name(predicate, &map)
+}
+
+/// Re-attach a stack of projections above `new_inner`, restoring the original
+/// plan shape with the new (possibly retyped) join at the bottom. Projection
+/// schemas are reused as-is; only nullability of columns sourced from the
+/// formerly-outer side may have changed, and the existing rule already takes
+/// this looser-schema approach at the join itself.
+fn rewrap_projections(
+    new_inner: LogicalPlan,
+    projections: Vec<Projection>,
+) -> LogicalPlan {
+    let mut current = new_inner;
+    for mut p in projections.into_iter().rev() {
+        p.input = Arc::new(current);
+        current = LogicalPlan::Projection(p);
+    }
+    current
+}
+
+fn unalias(expr: &Expr) -> &Expr {
+    if let Expr::Alias(a) = expr {
+        unalias(&a.expr)
+    } else {
+        expr
+    }
 }
 
 pub fn eliminate_outer(
@@ -139,28 +242,14 @@ pub fn eliminate_outer(
     left_non_nullable: bool,
     right_non_nullable: bool,
 ) -> JoinType {
-    let mut new_join_type = join_type;
-    match join_type {
-        JoinType::Left if right_non_nullable => {
-            new_join_type = JoinType::Inner;
-        }
-        JoinType::Left => {}
-        JoinType::Right if left_non_nullable => {
-            new_join_type = JoinType::Inner;
-        }
-        JoinType::Right => {}
-        JoinType::Full => {
-            if left_non_nullable && right_non_nullable {
-                new_join_type = JoinType::Inner;
-            } else if left_non_nullable {
-                new_join_type = JoinType::Left;
-            } else if right_non_nullable {
-                new_join_type = JoinType::Right;
-            }
-        }
-        _ => {}
+    match (join_type, left_non_nullable, right_non_nullable) {
+        (JoinType::Left, _, true) => JoinType::Inner,
+        (JoinType::Right, true, _) => JoinType::Inner,
+        (JoinType::Full, true, true) => JoinType::Inner,
+        (JoinType::Full, true, false) => JoinType::Left,
+        (JoinType::Full, false, true) => JoinType::Right,
+        _ => join_type,
     }
-    new_join_type
 }
 
 /// Find the columns that `expr` rejects NULL on. If any of these columns are
@@ -1251,6 +1340,97 @@ mod tests {
         ")
     }
 
+    // ----- Filter pierces a Projection to reach the Join -----
+
+    #[test]
+    fn eliminate_left_through_projection() -> Result<()> {
+        let t1 = test_table_scan_with_name("t1")?;
+        let t2 = test_table_scan_with_name("t2")?;
+
+        // Filter → Projection → LeftJoin is the shape produced by projection
+        // pruning in queries such as TPC-DS q49, where the post-join
+        // Projection sits between the filter and the join.
+        let plan = LogicalPlanBuilder::from(t1)
+            .join(
+                t2,
+                JoinType::Left,
+                (vec![Column::from_name("a")], vec![Column::from_name("a")]),
+                None,
+            )?
+            .project(vec![col("t1.a"), col("t2.b").alias("bb")])?
+            .filter(col("bb").gt(lit(10u32)))?
+            .build()?;
+
+        assert_optimized_plan_equal!(plan, @r"
+        Filter: bb > UInt32(10)
+          Projection: t1.a, t2.b AS bb
+            Inner Join: t1.a = t2.a
+              TableScan: t1
+              TableScan: t2
+        ")
+    }
+
+    #[test]
+    fn no_eliminate_left_through_projection_with_or_cross_side() -> Result<()> 
{
+        let t1 = test_table_scan_with_name("t1")?;
+        let t2 = test_table_scan_with_name("t2")?;
+
+        // After inlining the filter is still t1.b > 10 OR t2.b < 20, which
+        // is null-tolerant when t2 is NULL (the t1.b clause can still hold).
+        // The LEFT JOIN must be preserved.
+        let plan = LogicalPlanBuilder::from(t1)
+            .join(
+                t2,
+                JoinType::Left,
+                (vec![Column::from_name("a")], vec![Column::from_name("a")]),
+                None,
+            )?
+            .project(vec![col("t1.b").alias("x"), col("t2.b").alias("y")])?
+            .filter(binary_expr(
+                col("x").gt(lit(10u32)),
+                Or,
+                col("y").lt(lit(20u32)),
+            ))?
+            .build()?;
+
+        assert_optimized_plan_equal!(plan, @r"
+        Filter: x > UInt32(10) OR y < UInt32(20)
+          Projection: t1.b AS x, t2.b AS y
+            Left Join: t1.a = t2.a
+              TableScan: t1
+              TableScan: t2
+        ")
+    }
+
+    #[test]
+    fn no_eliminate_left_through_projection_with_only_left_filter() -> 
Result<()> {
+        let t1 = test_table_scan_with_name("t1")?;
+        let t2 = test_table_scan_with_name("t2")?;
+
+        // A filter that constrains only the preserved (left) side of a
+        // LEFT JOIN does not justify converting it to INNER — the LEFT
+        // would still pass nullable right-side rows that the filter
+        // accepts.
+        let plan = LogicalPlanBuilder::from(t1)
+            .join(
+                t2,
+                JoinType::Left,
+                (vec![Column::from_name("a")], vec![Column::from_name("a")]),
+                None,
+            )?
+            .project(vec![col("t1.b").alias("x"), col("t2.b")])?
+            .filter(col("x").gt(lit(10u32)))?
+            .build()?;
+
+        assert_optimized_plan_equal!(plan, @r"
+        Filter: x > UInt32(10)
+          Projection: t1.b AS x, t2.b
+            Left Join: t1.a = t2.a
+              TableScan: t1
+              TableScan: t2
+        ")
+    }
+
     #[test]
     fn eliminate_left_with_arithmetic_predicate() -> Result<()> {
         let t1 = test_table_scan_with_name("t1")?;
@@ -1283,7 +1463,6 @@ mod tests {
             TableScan: t2
         ")
     }
-
     #[test]
     fn eliminate_left_with_negative_predicate() -> Result<()> {
         let t1 = test_table_scan_with_name("t1")?;
@@ -1368,4 +1547,33 @@ mod tests {
             TableScan: t2
         ")
     }
+
+    #[test]
+    fn no_eliminate_through_non_transparent() -> Result<()> {
+        let t1 = test_table_scan_with_name("t1")?;
+        let t2 = test_table_scan_with_name("t2")?;
+
+        // Limit is intentionally not treated as transparent: a Limit below
+        // the Filter changes which rows survive, so swapping LEFT→INNER
+        // beneath it could yield a different surviving-row set even when
+        // the filter is null-rejecting on the right side.
+        let plan = LogicalPlanBuilder::from(t1)
+            .join(
+                t2,
+                JoinType::Left,
+                (vec![Column::from_name("a")], vec![Column::from_name("a")]),
+                None,
+            )?
+            .limit(0, Some(5))?
+            .filter(col("t2.b").gt(lit(10u32)))?
+            .build()?;
+
+        assert_optimized_plan_equal!(plan, @r"
+        Filter: t2.b > UInt32(10)
+          Limit: skip=0, fetch=5
+            Left Join: t1.a = t2.a
+              TableScan: t1
+              TableScan: t2
+        ")
+    }
 }
diff --git a/datafusion/sqllogictest/test_files/eliminate_outer_join.slt 
b/datafusion/sqllogictest/test_files/eliminate_outer_join.slt
index d22a7f2e3c..584d8af419 100644
--- a/datafusion/sqllogictest/test_files/eliminate_outer_join.slt
+++ b/datafusion/sqllogictest/test_files/eliminate_outer_join.slt
@@ -538,6 +538,110 @@ select * from t1 left join t2 on t1.a = t2.x where (t2.y 
> 150) is true and t2.z
 ----
 2 20 b 2 200 q
 
+###
+### Projection between Filter and Join
+###
+
+# A filter on a volatile, projected expression can still be used for outer join
+# elimination.
+query TT
+explain
+select s.a
+from (
+  select t1.a, random() + cast(t2.y as double) as ry
+  from t1 left join t2 on t1.a = t2.x
+) s
+where s.ry > 150.0;
+----
+logical_plan
+01)SubqueryAlias: s
+02)--Projection: t1.a
+03)----Filter: ry > Float64(150)
+04)------Projection: t1.a, random() + CAST(t2.y AS Float64) AS ry
+05)--------Inner Join: t1.a = t2.x
+06)----------TableScan: t1 projection=[a]
+07)----------TableScan: t2 projection=[x, y]
+
+query I rowsort
+select s.a
+from (
+  select t1.a, random() + cast(t2.y as double) as ry
+  from t1 left join t2 on t1.a = t2.x
+) s
+where s.ry > 150.0;
+----
+2
+
+# This query has the shape of TPC-DS Q49: `OptimizeProjections` results in
+# placing a `Projection` node between the `Filter` and `Join`, but we can look
+# through that node to convert the outer join.
+statement ok
+create table d(k int, flag int);
+
+statement ok
+insert into d values (1, 1), (2, 1), (3, 0);
+
+query TT
+explain
+select t1.a, sum(coalesce(t2.y, 0)) as ret_sum
+from t1 left join t2 on t1.a = t2.x, d
+where t2.y > 150
+  and t1.a = d.k
+  and d.flag = 1
+group by t1.a;
+----
+logical_plan
+01)Projection: t1.a, sum(coalesce(t2.y,Int64(0))) AS ret_sum
+02)--Aggregate: groupBy=[[t1.a]], aggr=[[sum(CASE WHEN __common_expr_1 IS NOT 
NULL THEN __common_expr_1 ELSE Int64(0) END) AS sum(coalesce(t2.y,Int64(0)))]]
+03)----Projection: CAST(t2.y AS Int64) AS __common_expr_1, t1.a
+04)------Inner Join: t1.a = d.k
+05)--------Projection: t1.a, t2.y
+06)----------Inner Join: t1.a = t2.x
+07)------------TableScan: t1 projection=[a]
+08)------------Filter: t2.y > Int32(150)
+09)--------------TableScan: t2 projection=[x, y]
+10)--------Projection: d.k
+11)----------Filter: d.flag = Int32(1)
+12)------------TableScan: d projection=[k, flag]
+
+query II rowsort
+select t1.a, sum(coalesce(t2.y, 0)) as ret_sum
+from t1 left join t2 on t1.a = t2.x, d
+where t2.y > 150
+  and t1.a = d.k
+  and d.flag = 1
+group by t1.a;
+----
+2 200
+
+# A CTE can introduce a query boundary between the outer filter and the
+# LEFT JOIN.
+query TT
+explain
+with s as (
+  select t1.a, t2.y
+  from t1 left join t2 on t1.a = t2.x
+)
+select s.a from s where s.y > 150;
+----
+logical_plan
+01)SubqueryAlias: s
+02)--Projection: t1.a
+03)----Inner Join: t1.a = t2.x
+04)------TableScan: t1 projection=[a]
+05)------Projection: t2.x
+06)--------Filter: t2.y > Int32(150)
+07)----------TableScan: t2 projection=[x, y]
+
+query I rowsort
+with s as (
+  select t1.a, t2.y
+  from t1 left join t2 on t1.a = t2.x
+)
+select s.a from s where s.y > 150;
+----
+2
+
 ###
 ### Cleanup
 ###
@@ -550,3 +654,6 @@ drop table t1;
 
 statement ok
 drop table t2;
+
+statement ok
+drop table d;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to