alamb commented on code in PR #22652:
URL: https://github.com/apache/datafusion/pull/22652#discussion_r3364989762


##########
datafusion/optimizer/src/eliminate_join.rs:
##########
@@ -15,19 +15,69 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! [`EliminateJoin`] rewrites `INNER JOIN` with `true`/`null`
-use crate::optimizer::ApplyOrder;
+//! [`EliminateJoin`] rewrites inner joins to simpler forms to make them 
cheaper

Review Comment:
   I always enjoy reading PRs where you learn something from the comments ❤️ 🤓 
   



##########
datafusion/optimizer/src/eliminate_join.rs:
##########
@@ -42,44 +92,485 @@ impl OptimizerRule for EliminateJoin {
         "eliminate_join"
     }
 
-    fn apply_order(&self) -> Option<ApplyOrder> {
-        Some(ApplyOrder::TopDown)
-    }
-
     fn rewrite(
         &self,
         plan: LogicalPlan,
         _config: &dyn OptimizerConfig,
     ) -> Result<Transformed<LogicalPlan>> {
-        match plan {
-            LogicalPlan::Join(join) if join.join_type == Inner && 
join.on.is_empty() => {
-                match join.filter {
-                    Some(Expr::Literal(ScalarValue::Boolean(Some(false)), _)) 
=> Ok(
-                        
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
-                            produce_one_row: false,
-                            schema: join.schema,
-                        })),
-                    ),
-                    _ => Ok(Transformed::no(LogicalPlan::Join(join))),
+        let live = all_columns(plan.schema());
+        rewrite_subtree(plan, live, false)
+    }
+}
+
+/// Rewrites `plan` and everything below it, including joins nested inside
+/// subquery expressions.
+///
+/// [`rewrite_node`] handles the node itself and recurses into its plan
+/// children; this wrapper additionally descends into the node's own subquery
+/// expressions.  Each subquery is seeded as a fresh root, since its columns 
are
+/// independent of the enclosing plan's `live` set.
+fn rewrite_subtree(
+    plan: LogicalPlan,
+    live: LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    rewrite_node(plan, live, duplicate_insensitive)?.transform_data(|plan| {
+        plan.map_subqueries(|subquery| {
+            let live = all_columns(subquery.schema());
+            rewrite_subtree(subquery, live, false)
+        })
+    })
+}
+
+fn rewrite_node(
+    plan: LogicalPlan,
+    live: LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    match plan {
+        // The only arm that rewrites a join; the rest just thread context 
down to one.
+        LogicalPlan::Join(join) => rewrite_join(join, &live, 
duplicate_insensitive),
+        LogicalPlan::Projection(Projection {
+            expr,
+            input,
+            schema,
+            ..
+        }) => {
+            // Narrows `live` to the columns the projection's expressions 
reference.
+            let child_live = live_columns(&expr, input.schema())?;
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                    expr, input, schema,
+                )?))
+            })
+        }
+        LogicalPlan::Filter(Filter {
+            predicate, input, ..
+        }) => {
+            // Adds the predicate's columns to `live` (a side used only by the 
filter stays live).
+            let mut child_live = live;
+            extend_live_columns(&mut child_live, [&predicate], 
input.schema())?;
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Filter(Filter::new(predicate, input)))
+            })
+        }
+        LogicalPlan::Aggregate(Aggregate {
+            input,
+            group_expr,
+            aggr_expr,
+            schema,
+            ..
+        }) => {
+            // Narrows `live` to the grouping and aggregate expressions' 
columns.
+            let child_live =
+                live_columns(group_expr.iter().chain(&aggr_expr), 
input.schema())?;
+
+            // A grouping aggregate with no aggregate functions (`GROUP BY` 
with
+            // an empty `aggr_expr`) only observes which group-key values 
exist,
+            // not how many rows produced them, so its input is duplicate-
+            // insensitive.
+            let child_duplicate_insensitive =
+                !group_expr.is_empty() && aggr_expr.is_empty();
+
+            rewrite_single_input(
+                input,
+                child_live,
+                child_duplicate_insensitive,
+                |input| {
+                    Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
+                        input, group_expr, aggr_expr, schema,
+                    )?))
+                },
+            )
+        }
+        LogicalPlan::Distinct(Distinct::All(input)) => {
+            // `SELECT DISTINCT *` is equivalent to a no-aggregate `GROUP BY`
+            // over every input column, so the input is duplicate-insensitive,
+            // but every column is part of the dedup key.
+            let child_live = all_columns(input.schema());
+            rewrite_single_input(input, child_live, true, |input| {
+                Ok(LogicalPlan::Distinct(Distinct::All(input)))
+            })
+        }
+        LogicalPlan::Distinct(Distinct::On(DistinctOn {
+            on_expr,
+            select_expr,
+            sort_expr,
+            input,
+            schema,
+        })) => {
+            // `DISTINCT ON (on) select [ORDER BY sort]` is a no-aggregate
+            // `GROUP BY` on the columns it reads, so its input is duplicate-
+            // insensitive; the live columns are exactly those of the
+            // ON/SELECT/ORDER BY expressions.
+            let mut child_live =
+                live_columns(on_expr.iter().chain(&select_expr), 
input.schema())?;
+            if let Some(sort_expr) = &sort_expr {
+                extend_live_columns(
+                    &mut child_live,
+                    sort_expr.iter().map(|s| &s.expr),
+                    input.schema(),
+                )?;
+            }
+
+            rewrite_single_input(input, child_live, true, |input| {
+                Ok(LogicalPlan::Distinct(Distinct::On(DistinctOn {
+                    on_expr,
+                    select_expr,
+                    sort_expr,
+                    input,
+                    schema,
+                })))
+            })
+        }
+        LogicalPlan::Sort(Sort { expr, input, fetch }) => {
+            // Adds the sort-key columns to `live`.
+            let mut child_live = live;
+            extend_live_columns(
+                &mut child_live,
+                expr.iter().map(|s| &s.expr),
+                input.schema(),
+            )?;
+
+            // A `fetch` (top-N) makes the row count observable, so duplicate-

Review Comment:
   I was confused at first as I thought that every sort duplicate insensitive? 
But now I see now that the sort will preserve its ancestor's insensitivity -- 
basically sorting before/after dupe removal is equivalent (assuming the sort 
doesn't have a topk) 👍 



##########
datafusion/optimizer/src/eliminate_join.rs:
##########
@@ -42,44 +92,485 @@ impl OptimizerRule for EliminateJoin {
         "eliminate_join"
     }
 
-    fn apply_order(&self) -> Option<ApplyOrder> {
-        Some(ApplyOrder::TopDown)
-    }
-
     fn rewrite(
         &self,
         plan: LogicalPlan,
         _config: &dyn OptimizerConfig,
     ) -> Result<Transformed<LogicalPlan>> {
-        match plan {
-            LogicalPlan::Join(join) if join.join_type == Inner && 
join.on.is_empty() => {
-                match join.filter {
-                    Some(Expr::Literal(ScalarValue::Boolean(Some(false)), _)) 
=> Ok(
-                        
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
-                            produce_one_row: false,
-                            schema: join.schema,
-                        })),
-                    ),
-                    _ => Ok(Transformed::no(LogicalPlan::Join(join))),
+        let live = all_columns(plan.schema());
+        rewrite_subtree(plan, live, false)
+    }
+}
+
+/// Rewrites `plan` and everything below it, including joins nested inside
+/// subquery expressions.
+///
+/// [`rewrite_node`] handles the node itself and recurses into its plan
+/// children; this wrapper additionally descends into the node's own subquery
+/// expressions.  Each subquery is seeded as a fresh root, since its columns 
are
+/// independent of the enclosing plan's `live` set.
+fn rewrite_subtree(
+    plan: LogicalPlan,
+    live: LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    rewrite_node(plan, live, duplicate_insensitive)?.transform_data(|plan| {
+        plan.map_subqueries(|subquery| {
+            let live = all_columns(subquery.schema());
+            rewrite_subtree(subquery, live, false)
+        })
+    })
+}
+
+fn rewrite_node(
+    plan: LogicalPlan,
+    live: LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    match plan {
+        // The only arm that rewrites a join; the rest just thread context 
down to one.
+        LogicalPlan::Join(join) => rewrite_join(join, &live, 
duplicate_insensitive),
+        LogicalPlan::Projection(Projection {
+            expr,
+            input,
+            schema,
+            ..
+        }) => {
+            // Narrows `live` to the columns the projection's expressions 
reference.
+            let child_live = live_columns(&expr, input.schema())?;
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                    expr, input, schema,
+                )?))
+            })
+        }
+        LogicalPlan::Filter(Filter {
+            predicate, input, ..
+        }) => {
+            // Adds the predicate's columns to `live` (a side used only by the 
filter stays live).
+            let mut child_live = live;
+            extend_live_columns(&mut child_live, [&predicate], 
input.schema())?;
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Filter(Filter::new(predicate, input)))
+            })
+        }
+        LogicalPlan::Aggregate(Aggregate {
+            input,
+            group_expr,
+            aggr_expr,
+            schema,
+            ..
+        }) => {
+            // Narrows `live` to the grouping and aggregate expressions' 
columns.
+            let child_live =
+                live_columns(group_expr.iter().chain(&aggr_expr), 
input.schema())?;
+
+            // A grouping aggregate with no aggregate functions (`GROUP BY` 
with
+            // an empty `aggr_expr`) only observes which group-key values 
exist,
+            // not how many rows produced them, so its input is duplicate-
+            // insensitive.
+            let child_duplicate_insensitive =
+                !group_expr.is_empty() && aggr_expr.is_empty();
+
+            rewrite_single_input(
+                input,
+                child_live,
+                child_duplicate_insensitive,
+                |input| {
+                    Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
+                        input, group_expr, aggr_expr, schema,
+                    )?))
+                },
+            )
+        }
+        LogicalPlan::Distinct(Distinct::All(input)) => {
+            // `SELECT DISTINCT *` is equivalent to a no-aggregate `GROUP BY`
+            // over every input column, so the input is duplicate-insensitive,
+            // but every column is part of the dedup key.
+            let child_live = all_columns(input.schema());
+            rewrite_single_input(input, child_live, true, |input| {
+                Ok(LogicalPlan::Distinct(Distinct::All(input)))
+            })
+        }
+        LogicalPlan::Distinct(Distinct::On(DistinctOn {
+            on_expr,
+            select_expr,
+            sort_expr,
+            input,
+            schema,
+        })) => {
+            // `DISTINCT ON (on) select [ORDER BY sort]` is a no-aggregate
+            // `GROUP BY` on the columns it reads, so its input is duplicate-
+            // insensitive; the live columns are exactly those of the
+            // ON/SELECT/ORDER BY expressions.
+            let mut child_live =
+                live_columns(on_expr.iter().chain(&select_expr), 
input.schema())?;
+            if let Some(sort_expr) = &sort_expr {
+                extend_live_columns(
+                    &mut child_live,
+                    sort_expr.iter().map(|s| &s.expr),
+                    input.schema(),
+                )?;
+            }
+
+            rewrite_single_input(input, child_live, true, |input| {
+                Ok(LogicalPlan::Distinct(Distinct::On(DistinctOn {
+                    on_expr,
+                    select_expr,
+                    sort_expr,
+                    input,
+                    schema,
+                })))
+            })
+        }
+        LogicalPlan::Sort(Sort { expr, input, fetch }) => {
+            // Adds the sort-key columns to `live`.
+            let mut child_live = live;
+            extend_live_columns(
+                &mut child_live,
+                expr.iter().map(|s| &s.expr),
+                input.schema(),
+            )?;
+
+            // A `fetch` (top-N) makes the row count observable, so duplicate-
+            // insensitivity does not survive past it.
+            let child_duplicate_insensitive = duplicate_insensitive && 
fetch.is_none();
+            rewrite_single_input(
+                input,
+                child_live,
+                child_duplicate_insensitive,
+                |input| Ok(LogicalPlan::Sort(Sort { expr, input, fetch })),
+            )
+        }
+        LogicalPlan::Limit(Limit { skip, fetch, input }) => {
+            // LIMIT makes the row count observable, so it clears 
duplicate-insensitivity.
+            rewrite_single_input(input, live, false, |input| {
+                Ok(LogicalPlan::Limit(Limit { skip, fetch, input }))
+            })
+        }
+        LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
+            // Re-aliases columns 1:1, so `live` and duplicate-sensitivity 
pass through unchanged.
+            rewrite_single_input(input, live, duplicate_insensitive, |input| {
+                Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
+                    input, alias,
+                )?))
+            })
+        }
+        LogicalPlan::Repartition(Repartition {
+            input,
+            partitioning_scheme,
+        }) => {
+            // Adds any partitioning-key columns to `live`; 
duplicate-sensitivity is unchanged.
+            let mut child_live = live;
+            match &partitioning_scheme {
+                Partitioning::Hash(exprs, _) | 
Partitioning::DistributeBy(exprs) => {
+                    extend_live_columns(&mut child_live, exprs, 
input.schema())?;
                 }
+                Partitioning::RoundRobinBatch(_) => {}
             }
-            _ => Ok(Transformed::no(plan)),
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Repartition(Repartition {
+                    input,
+                    partitioning_scheme,
+                }))
+            })
         }
+        // Conservatively treat any other plan node as a fresh root, since we 
are
+        // not sure of its semantics with respect to duplicates or live 
columns.
+        _ => plan.map_children(|child| {
+            let live = all_columns(child.schema());
+            rewrite_subtree(child, live, false)
+        }),
     }
+}
+
+/// Recurses into a single-input node's child, threading `child_live` and
+/// `duplicate_insensitive` down, then rebuilds the node from the (possibly
+/// rewritten) child via `rebuild`. The child's `Transformed` flag is 
preserved,
+/// so the node is reported as changed exactly when its child changed.
+fn rewrite_single_input<F>(
+    input: Arc<LogicalPlan>,
+    child_live: LiveColumns,
+    duplicate_insensitive: bool,
+    rebuild: F,
+) -> Result<Transformed<LogicalPlan>>
+where
+    F: FnOnce(Arc<LogicalPlan>) -> Result<LogicalPlan>,
+{
+    rewrite_subtree(
+        Arc::unwrap_or_clone(input),
+        child_live,
+        duplicate_insensitive,
+    )?
+    .map_data(|input| rebuild(Arc::new(input)))
+}
+
+fn rewrite_join(
+    join: Join,
+    live: &LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    if join.join_type == JoinType::Inner
+        && join.on.is_empty()
+        && matches!(
+            join.filter.as_ref(),
+            Some(Expr::Literal(ScalarValue::Boolean(Some(false)), _))
+        )
+    {
+        return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
+            EmptyRelation {
+                produce_one_row: false,
+                schema: join.schema,
+            },
+        )));
+    }
+
+    let (visible_left, visible_right) = split_join_output_columns(&join, live);
+
+    let rewritten_join_type =
+        rewritten_join_type(&join, &visible_left, &visible_right, 
duplicate_insensitive);
+
+    let (mut left_live, mut right_live) = match rewritten_join_type {
+        JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
+            (visible_left, LiveColumns::new())
+        }
+        JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
+            (LiveColumns::new(), visible_right)
+        }
+        _ => (visible_left, visible_right),
+    };
+
+    add_join_condition_columns(&join, &mut left_live, &mut right_live)?;
+
+    let (left_dup_insensitive, right_dup_insensitive) =
+        child_duplicate_insensitivity(rewritten_join_type, 
duplicate_insensitive);
 
-    fn supports_rewrite(&self) -> bool {
-        true
+    let left = rewrite_subtree(
+        Arc::unwrap_or_clone(join.left),
+        left_live,
+        left_dup_insensitive,
+    )?;
+    let right = rewrite_subtree(
+        Arc::unwrap_or_clone(join.right),
+        right_live,
+        right_dup_insensitive,
+    )?;
+
+    let changed =
+        left.transformed || right.transformed || rewritten_join_type != 
join.join_type;
+    let left = Arc::new(left.data);
+    let right = Arc::new(right.data);
+
+    if changed {
+        // The join type or an input changed, so the output schema may have
+        // narrowed; recompute it via `try_new`.
+        Ok(Transformed::yes(LogicalPlan::Join(Join::try_new(
+            left,
+            right,
+            join.on,
+            join.filter,
+            rewritten_join_type,
+            join.join_constraint,
+            join.null_equality,
+            join.null_aware,
+        )?)))
+    } else {
+        // Nothing changed; reassemble the join reusing its existing schema 
rather
+        // than recomputing it.
+        Ok(Transformed::no(LogicalPlan::Join(Join {
+            left,
+            right,
+            on: join.on,
+            filter: join.filter,
+            join_type: join.join_type,
+            join_constraint: join.join_constraint,
+            schema: join.schema,
+            null_equality: join.null_equality,
+            null_aware: join.null_aware,
+        })))
+    }
+}
+
+/// Returns which join inputs can safely ignore duplicate rows from their own
+/// descendants. For semi/anti/mark joins, duplicates from the existence side 
do
+/// not change the result even when the parent itself is duplicate-sensitive.
+fn child_duplicate_insensitivity(
+    join_type: JoinType,
+    duplicate_insensitive: bool,
+) -> (bool, bool) {
+    match join_type {
+        JoinType::Inner => (duplicate_insensitive, duplicate_insensitive),
+        JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
+            (duplicate_insensitive, true)
+        }
+        JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
+            (true, duplicate_insensitive)
+        }
+        JoinType::Left | JoinType::Right | JoinType::Full => (false, false),
     }
 }
 
+/// Rewrites an inner join to a semi join when the removed side has no

Review Comment:
   Minor: these might be better encapsulated as methods on `LiveColumns`



##########
datafusion/optimizer/src/eliminate_join.rs:
##########
@@ -15,19 +15,69 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! [`EliminateJoin`] rewrites `INNER JOIN` with `true`/`null`
-use crate::optimizer::ApplyOrder;
+//! [`EliminateJoin`] rewrites inner joins to simpler forms to make them 
cheaper
+//! to evaluate. We implement two distinct rewrites:
+//!
+//! * An inner join can be rewritten to an empty relation if the join condition
+//!   is trivially false.
+//!
+//! * An inner join `L ⋈ R` can be rewritten to a left semi join `L ⋉ R`
+//!   (`LeftSemi`), which keeps the rows of L that have a match in R and 
outputs
+//!   only L's columns. The rewrite to `L ⋉ R` is valid when both of the
+//!   following are true:
+//!
+//!     1. None of R's columns are referenced above the join.

Review Comment:
   👍 



##########
datafusion/optimizer/src/eliminate_join.rs:
##########
@@ -15,19 +15,69 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! [`EliminateJoin`] rewrites `INNER JOIN` with `true`/`null`
-use crate::optimizer::ApplyOrder;
+//! [`EliminateJoin`] rewrites inner joins to simpler forms to make them 
cheaper
+//! to evaluate. We implement two distinct rewrites:
+//!
+//! * An inner join can be rewritten to an empty relation if the join condition
+//!   is trivially false.
+//!
+//! * An inner join `L ⋈ R` can be rewritten to a left semi join `L ⋉ R`
+//!   (`LeftSemi`), which keeps the rows of L that have a match in R and 
outputs
+//!   only L's columns. The rewrite to `L ⋉ R` is valid when both of the
+//!   following are true:
+//!
+//!     1. None of R's columns are referenced above the join.
+//!     2. R does not observably multiply L's rows. This holds when either the
+//!        join's ancestors are duplicate-insensitive (e.g., DISTINCT) or we 
can use
+//!        functional dependencies to prove that each L row matches at most 
one R
+//!        row (R is provably unique on the join keys).
+//!
+//! # Overview
+//!
+//! `rewrite_subtree` walks the plan top-down, threading two pieces of context
+//! down to each join:
+//!
+//! * `live` — which of the join's output columns are referenced above it. It 
is
+//!   propagated top-down: each node asks its children only for the columns it
+//!   needs from them, so a projection or aggregate asks for just the columns 
its
+//!   expressions reference, dropping the rest (the narrowing); a join splits 
the
+//!   set across its two inputs.
+//! * `duplicate_insensitive` — whether emitting each row once instead of many
+//!   times will not change the output. A duplicate-collapsing node (e.g.,
+//!   DISTINCT, GROUP BY with no aggregate functions, or the existence side of 
a
+//!   semi/anti/mark join) sets it `true` for its subtree, and it propagates
+//!   downward until a node that makes the row count observable again (a 
`LIMIT`,
+//!   a top-N sort, ...) clears it. It is therefore fixed by the nearest such
+//!   node, not by the whole ancestor chain: a collapsing node shields its 
subtree,
+//!   so a duplicate-sensitive node further above does not matter.
+//!
+//! At each join, `rewritten_join_type` combines this context with the side's
+//! functional dependencies to choose `Inner`, `LeftSemi`, or `RightSemi`. Most
+//! node types just forward the context to their single child via
+//! `rewrite_single_input`; nodes that alter column requirements or
+//! duplicate-sensitivity (projection, aggregate, sort, ...) adjust it first.
+use crate::utils::for_each_referenced_index;
 use crate::{OptimizerConfig, OptimizerRule};
-use datafusion_common::tree_node::Transformed;
-use datafusion_common::{Result, ScalarValue};
-use datafusion_expr::JoinType::Inner;
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::{
+    DFSchema, Dependency, HashSet, NullEquality, Result, ScalarValue,
+};
 use datafusion_expr::{
-    Expr,
-    logical_plan::{EmptyRelation, LogicalPlan},
+    Expr, JoinType,
+    logical_plan::{
+        Aggregate, Distinct, DistinctOn, EmptyRelation, Filter, Join, Limit, 
LogicalPlan,
+        Partitioning, Projection, Repartition, Sort, SubqueryAlias,
+    },
 };
+use std::sync::Arc;
+
+/// The columns that are "live" at a plan node, i.e., which of its output
+/// columns are referenced by an ancestor node. Represented as a set of column
+/// indices, relative to the node's schema.

Review Comment:
   it might be useful to refer back to the module level comments that explain 
this structure in more detail



##########
datafusion/optimizer/src/eliminate_join.rs:
##########
@@ -42,44 +92,485 @@ impl OptimizerRule for EliminateJoin {
         "eliminate_join"
     }
 
-    fn apply_order(&self) -> Option<ApplyOrder> {
-        Some(ApplyOrder::TopDown)
-    }
-
     fn rewrite(
         &self,
         plan: LogicalPlan,
         _config: &dyn OptimizerConfig,
     ) -> Result<Transformed<LogicalPlan>> {
-        match plan {
-            LogicalPlan::Join(join) if join.join_type == Inner && 
join.on.is_empty() => {
-                match join.filter {
-                    Some(Expr::Literal(ScalarValue::Boolean(Some(false)), _)) 
=> Ok(
-                        
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
-                            produce_one_row: false,
-                            schema: join.schema,
-                        })),
-                    ),
-                    _ => Ok(Transformed::no(LogicalPlan::Join(join))),
+        let live = all_columns(plan.schema());
+        rewrite_subtree(plan, live, false)
+    }
+}
+
+/// Rewrites `plan` and everything below it, including joins nested inside
+/// subquery expressions.
+///
+/// [`rewrite_node`] handles the node itself and recurses into its plan
+/// children; this wrapper additionally descends into the node's own subquery
+/// expressions.  Each subquery is seeded as a fresh root, since its columns 
are
+/// independent of the enclosing plan's `live` set.
+fn rewrite_subtree(
+    plan: LogicalPlan,
+    live: LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    rewrite_node(plan, live, duplicate_insensitive)?.transform_data(|plan| {
+        plan.map_subqueries(|subquery| {
+            let live = all_columns(subquery.schema());
+            rewrite_subtree(subquery, live, false)
+        })
+    })
+}
+
+fn rewrite_node(
+    plan: LogicalPlan,
+    live: LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    match plan {

Review Comment:
   I think this code depends on being able to walk all expressions used in the 
plan to remain correct. I worry if we ever modify a LogicalPlan node to add a 
new Expr container -- this code would continue to compile, and we would 
probably end up with subtle bugs
   
   SOme ideas on how to address:
   1. Use this API for visiting expressions/finding live columns: 
https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html#method.apply_expressions
   2. name all fields (remove `..`) so that if any field is added, the PR must 
also explicitly change this file too



##########
datafusion/sqllogictest/test_files/tpch/plans/q2.slt.part:
##########
@@ -65,9 +65,9 @@ limit 10;
 logical_plan
 01)Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, 
supplier.s_name ASC NULLS LAST, part.p_partkey ASC NULLS LAST, fetch=10
 02)--Projection: supplier.s_acctbal, supplier.s_name, nation.n_name, 
part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, 
supplier.s_comment
-03)----Inner Join: part.p_partkey = __scalar_sq_1.ps_partkey, 
partsupp.ps_supplycost = __scalar_sq_1.min(partsupp.ps_supplycost)
+03)----LeftSemi Join: part.p_partkey = __scalar_sq_1.ps_partkey, 
partsupp.ps_supplycost = __scalar_sq_1.min(partsupp.ps_supplycost)

Review Comment:
   for anyone else reviewing this, TPCH has FK:PK joins on its schema, so this 
transformation can potentially help here 



##########
datafusion/optimizer/src/eliminate_join.rs:
##########
@@ -15,19 +15,75 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! [`EliminateJoin`] rewrites `INNER JOIN` with `true`/`null`
-use crate::optimizer::ApplyOrder;
+//! [`EliminateJoin`] rewrites inner joins to simpler forms to make them 
cheaper
+//! to evaluate.
+//!
+//! # What it rewrites
+//!
+//! * An inner join can be rewritten to an empty relation if the join condition
+//!   is trivially false.
+//!
+//! * An inner join `L ⋈ R` can be rewritten to a left semi join `L ⋉ R`
+//!   (`LeftSemi`), which keeps the rows of L that have a match in R and 
outputs
+//!   only L's columns. The rewrite to `L ⋉ R` is valid when both of the
+//!   following are true:
+//!
+//!     1. None of R's columns are referenced above the join.
+//!     2. R does not observably multiply L's rows. This holds when either the
+//!        join's ancestors are duplicate-insensitive (e.g., DISTINCT) or we 
can use
+//!        functional dependencies to prove that each L row matches at most 
one R
+//!        row (R is provably unique on the join keys).
+//!
+//! # How it works
+//!
+//! `rewrite_subtree` walks the plan top-down, threading two pieces of context
+//! down to each join:
+//!
+//! * `live` — which of the join's output columns are referenced above it. It 
is
+//!   propagated top-down: each node asks its children only for the columns it
+//!   needs from them, so a projection or aggregate asks for just the columns 
its
+//!   expressions reference, dropping the rest (the narrowing); a join splits 
the
+//!   set across its two inputs.
+//! * `duplicate_insensitive` — whether emitting each row once instead of many
+//!   times will not change the output. A duplicate-collapsing node (e.g.,
+//!   DISTINCT, GROUP BY with no aggregate functions, or the existence side of 
a
+//!   semi/anti/mark join) sets it `true` for its subtree, and it propagates
+//!   downward until a node that makes the row count observable again (a 
`LIMIT`,
+//!   a top-N sort, ...) clears it. It is therefore fixed by the nearest such
+//!   node, not by the whole ancestor chain: a collapsing node shields its 
subtree,
+//!   so a duplicate-sensitive node further above does not matter.
+//!
+//! At each join, `rewritten_join_type` combines this context with the side's
+//! functional dependencies to choose `Inner`, `LeftSemi`, or `RightSemi`. Most
+//! node types just forward the context to their single child via
+//! `rewrite_single_input`; nodes that alter column requirements or
+//! duplicate-sensitivity (projection, aggregate, sort, ...) adjust it first.
+//!
+//! Joins nested inside subquery expressions are reached as well: 
`rewrite_subtree`
+//! descends into each node's subquery plans itself (via `map_subqueries`),
+//! seeding each as a fresh root.
+use crate::utils::for_each_referenced_index;
 use crate::{OptimizerConfig, OptimizerRule};
-use datafusion_common::tree_node::Transformed;
-use datafusion_common::{Result, ScalarValue};
-use datafusion_expr::JoinType::Inner;
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::{
+    DFSchema, Dependency, HashSet, NullEquality, Result, ScalarValue,
+};
 use datafusion_expr::{
-    Expr,
-    logical_plan::{EmptyRelation, LogicalPlan},
+    Expr, JoinType, SortExpr,
+    logical_plan::{
+        Aggregate, Distinct, DistinctOn, EmptyRelation, Filter, Join, Limit, 
LogicalPlan,
+        Partitioning, Projection, Repartition, Sort, SubqueryAlias,
+    },
 };
+use std::sync::Arc;
+
+/// The columns that are "live" at a plan node, i.e., which of its output
+/// columns are referenced by an ancestor node. Represented as a set of column
+/// indices, relative to the node's schema.
+type LiveColumns = HashSet<usize>;

Review Comment:
   You could potentially use arrow BooleanBufferBuilder as a mutable bitset:  
https://docs.rs/arrow/latest/arrow/array/struct.BooleanBufferBuilder.html
   
   Not sure if that is actually faster or not



##########
datafusion/optimizer/src/eliminate_join.rs:
##########
@@ -42,44 +92,485 @@ impl OptimizerRule for EliminateJoin {
         "eliminate_join"
     }
 
-    fn apply_order(&self) -> Option<ApplyOrder> {
-        Some(ApplyOrder::TopDown)
-    }
-
     fn rewrite(
         &self,
         plan: LogicalPlan,
         _config: &dyn OptimizerConfig,
     ) -> Result<Transformed<LogicalPlan>> {
-        match plan {
-            LogicalPlan::Join(join) if join.join_type == Inner && 
join.on.is_empty() => {
-                match join.filter {
-                    Some(Expr::Literal(ScalarValue::Boolean(Some(false)), _)) 
=> Ok(
-                        
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
-                            produce_one_row: false,
-                            schema: join.schema,
-                        })),
-                    ),
-                    _ => Ok(Transformed::no(LogicalPlan::Join(join))),
+        let live = all_columns(plan.schema());
+        rewrite_subtree(plan, live, false)
+    }
+}
+
+/// Rewrites `plan` and everything below it, including joins nested inside
+/// subquery expressions.
+///
+/// [`rewrite_node`] handles the node itself and recurses into its plan
+/// children; this wrapper additionally descends into the node's own subquery
+/// expressions.  Each subquery is seeded as a fresh root, since its columns 
are
+/// independent of the enclosing plan's `live` set.
+fn rewrite_subtree(
+    plan: LogicalPlan,
+    live: LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    rewrite_node(plan, live, duplicate_insensitive)?.transform_data(|plan| {
+        plan.map_subqueries(|subquery| {
+            let live = all_columns(subquery.schema());
+            rewrite_subtree(subquery, live, false)
+        })
+    })
+}
+
+fn rewrite_node(
+    plan: LogicalPlan,
+    live: LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    match plan {
+        // The only arm that rewrites a join; the rest just thread context 
down to one.
+        LogicalPlan::Join(join) => rewrite_join(join, &live, 
duplicate_insensitive),
+        LogicalPlan::Projection(Projection {
+            expr,
+            input,
+            schema,
+            ..
+        }) => {
+            // Narrows `live` to the columns the projection's expressions 
reference.
+            let child_live = live_columns(&expr, input.schema())?;
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                    expr, input, schema,
+                )?))
+            })
+        }
+        LogicalPlan::Filter(Filter {
+            predicate, input, ..
+        }) => {
+            // Adds the predicate's columns to `live` (a side used only by the 
filter stays live).
+            let mut child_live = live;
+            extend_live_columns(&mut child_live, [&predicate], 
input.schema())?;
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Filter(Filter::new(predicate, input)))
+            })
+        }
+        LogicalPlan::Aggregate(Aggregate {
+            input,
+            group_expr,
+            aggr_expr,
+            schema,
+            ..
+        }) => {
+            // Narrows `live` to the grouping and aggregate expressions' 
columns.
+            let child_live =
+                live_columns(group_expr.iter().chain(&aggr_expr), 
input.schema())?;
+
+            // A grouping aggregate with no aggregate functions (`GROUP BY` 
with
+            // an empty `aggr_expr`) only observes which group-key values 
exist,
+            // not how many rows produced them, so its input is duplicate-
+            // insensitive.
+            let child_duplicate_insensitive =
+                !group_expr.is_empty() && aggr_expr.is_empty();
+
+            rewrite_single_input(
+                input,
+                child_live,
+                child_duplicate_insensitive,
+                |input| {
+                    Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
+                        input, group_expr, aggr_expr, schema,
+                    )?))
+                },
+            )
+        }
+        LogicalPlan::Distinct(Distinct::All(input)) => {
+            // `SELECT DISTINCT *` is equivalent to a no-aggregate `GROUP BY`
+            // over every input column, so the input is duplicate-insensitive,
+            // but every column is part of the dedup key.
+            let child_live = all_columns(input.schema());
+            rewrite_single_input(input, child_live, true, |input| {
+                Ok(LogicalPlan::Distinct(Distinct::All(input)))
+            })
+        }
+        LogicalPlan::Distinct(Distinct::On(DistinctOn {
+            on_expr,
+            select_expr,
+            sort_expr,
+            input,
+            schema,
+        })) => {
+            // `DISTINCT ON (on) select [ORDER BY sort]` is a no-aggregate
+            // `GROUP BY` on the columns it reads, so its input is duplicate-
+            // insensitive; the live columns are exactly those of the
+            // ON/SELECT/ORDER BY expressions.
+            let mut child_live =
+                live_columns(on_expr.iter().chain(&select_expr), 
input.schema())?;
+            if let Some(sort_expr) = &sort_expr {
+                extend_live_columns(
+                    &mut child_live,
+                    sort_expr.iter().map(|s| &s.expr),
+                    input.schema(),
+                )?;
+            }
+
+            rewrite_single_input(input, child_live, true, |input| {
+                Ok(LogicalPlan::Distinct(Distinct::On(DistinctOn {
+                    on_expr,
+                    select_expr,
+                    sort_expr,
+                    input,
+                    schema,
+                })))
+            })
+        }
+        LogicalPlan::Sort(Sort { expr, input, fetch }) => {
+            // Adds the sort-key columns to `live`.
+            let mut child_live = live;
+            extend_live_columns(
+                &mut child_live,
+                expr.iter().map(|s| &s.expr),
+                input.schema(),
+            )?;
+
+            // A `fetch` (top-N) makes the row count observable, so duplicate-
+            // insensitivity does not survive past it.
+            let child_duplicate_insensitive = duplicate_insensitive && 
fetch.is_none();
+            rewrite_single_input(
+                input,
+                child_live,
+                child_duplicate_insensitive,
+                |input| Ok(LogicalPlan::Sort(Sort { expr, input, fetch })),
+            )
+        }
+        LogicalPlan::Limit(Limit { skip, fetch, input }) => {
+            // LIMIT makes the row count observable, so it clears 
duplicate-insensitivity.
+            rewrite_single_input(input, live, false, |input| {
+                Ok(LogicalPlan::Limit(Limit { skip, fetch, input }))
+            })
+        }
+        LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
+            // Re-aliases columns 1:1, so `live` and duplicate-sensitivity 
pass through unchanged.
+            rewrite_single_input(input, live, duplicate_insensitive, |input| {
+                Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
+                    input, alias,
+                )?))
+            })
+        }
+        LogicalPlan::Repartition(Repartition {
+            input,
+            partitioning_scheme,
+        }) => {
+            // Adds any partitioning-key columns to `live`; 
duplicate-sensitivity is unchanged.
+            let mut child_live = live;
+            match &partitioning_scheme {
+                Partitioning::Hash(exprs, _) | 
Partitioning::DistributeBy(exprs) => {
+                    extend_live_columns(&mut child_live, exprs, 
input.schema())?;
                 }
+                Partitioning::RoundRobinBatch(_) => {}
             }
-            _ => Ok(Transformed::no(plan)),
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Repartition(Repartition {
+                    input,
+                    partitioning_scheme,
+                }))
+            })
         }
+        // Conservatively treat any other plan node as a fresh root, since we 
are
+        // not sure of its semantics with respect to duplicates or live 
columns.
+        _ => plan.map_children(|child| {
+            let live = all_columns(child.schema());
+            rewrite_subtree(child, live, false)
+        }),
     }
+}
+
+/// Recurses into a single-input node's child, threading `child_live` and
+/// `duplicate_insensitive` down, then rebuilds the node from the (possibly
+/// rewritten) child via `rebuild`. The child's `Transformed` flag is 
preserved,
+/// so the node is reported as changed exactly when its child changed.
+fn rewrite_single_input<F>(
+    input: Arc<LogicalPlan>,
+    child_live: LiveColumns,
+    duplicate_insensitive: bool,
+    rebuild: F,
+) -> Result<Transformed<LogicalPlan>>
+where
+    F: FnOnce(Arc<LogicalPlan>) -> Result<LogicalPlan>,
+{
+    rewrite_subtree(
+        Arc::unwrap_or_clone(input),
+        child_live,
+        duplicate_insensitive,
+    )?
+    .map_data(|input| rebuild(Arc::new(input)))
+}
+
+fn rewrite_join(
+    join: Join,
+    live: &LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    if join.join_type == JoinType::Inner
+        && join.on.is_empty()
+        && matches!(
+            join.filter.as_ref(),
+            Some(Expr::Literal(ScalarValue::Boolean(Some(false)), _))
+        )
+    {
+        return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
+            EmptyRelation {
+                produce_one_row: false,
+                schema: join.schema,
+            },
+        )));
+    }
+
+    let (visible_left, visible_right) = split_join_output_columns(&join, live);
+
+    let rewritten_join_type =
+        rewritten_join_type(&join, &visible_left, &visible_right, 
duplicate_insensitive);
+
+    let (mut left_live, mut right_live) = match rewritten_join_type {
+        JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
+            (visible_left, LiveColumns::new())
+        }
+        JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
+            (LiveColumns::new(), visible_right)
+        }
+        _ => (visible_left, visible_right),
+    };
+
+    add_join_condition_columns(&join, &mut left_live, &mut right_live)?;
+
+    let (left_dup_insensitive, right_dup_insensitive) =
+        child_duplicate_insensitivity(rewritten_join_type, 
duplicate_insensitive);
 
-    fn supports_rewrite(&self) -> bool {
-        true
+    let left = rewrite_subtree(
+        Arc::unwrap_or_clone(join.left),
+        left_live,
+        left_dup_insensitive,
+    )?;
+    let right = rewrite_subtree(
+        Arc::unwrap_or_clone(join.right),
+        right_live,
+        right_dup_insensitive,
+    )?;
+
+    let changed =
+        left.transformed || right.transformed || rewritten_join_type != 
join.join_type;
+    let left = Arc::new(left.data);
+    let right = Arc::new(right.data);
+
+    if changed {
+        // The join type or an input changed, so the output schema may have
+        // narrowed; recompute it via `try_new`.
+        Ok(Transformed::yes(LogicalPlan::Join(Join::try_new(
+            left,
+            right,
+            join.on,
+            join.filter,
+            rewritten_join_type,
+            join.join_constraint,
+            join.null_equality,
+            join.null_aware,
+        )?)))
+    } else {
+        // Nothing changed; reassemble the join reusing its existing schema 
rather
+        // than recomputing it.
+        Ok(Transformed::no(LogicalPlan::Join(Join {
+            left,
+            right,
+            on: join.on,
+            filter: join.filter,
+            join_type: join.join_type,
+            join_constraint: join.join_constraint,
+            schema: join.schema,
+            null_equality: join.null_equality,
+            null_aware: join.null_aware,
+        })))
+    }
+}
+
+/// Returns which join inputs can safely ignore duplicate rows from their own
+/// descendants. For semi/anti/mark joins, duplicates from the existence side 
do
+/// not change the result even when the parent itself is duplicate-sensitive.
+fn child_duplicate_insensitivity(
+    join_type: JoinType,
+    duplicate_insensitive: bool,
+) -> (bool, bool) {
+    match join_type {
+        JoinType::Inner => (duplicate_insensitive, duplicate_insensitive),
+        JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
+            (duplicate_insensitive, true)
+        }
+        JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
+            (true, duplicate_insensitive)
+        }
+        JoinType::Left | JoinType::Right | JoinType::Full => (false, false),
     }
 }
 
+/// Rewrites an inner join to a semi join when the removed side has no
+/// parent-visible columns and either the parent ignores duplicate output rows 
or
+/// the removed side is unique on the join keys.
+fn rewritten_join_type(
+    join: &Join,
+    visible_left: &LiveColumns,
+    visible_right: &LiveColumns,
+    duplicate_insensitive: bool,
+) -> JoinType {
+    if join.join_type != JoinType::Inner || join.on.is_empty() {
+        return join.join_type;
+    }
+
+    let can_remove_right = duplicate_insensitive
+        || side_unique_on_join(
+            join.right.schema(),
+            join.on.iter().map(|(_, right)| right),
+            join.null_equality,
+        );
+    if visible_right.is_empty() && can_remove_right {
+        return JoinType::LeftSemi;
+    }
+
+    let can_remove_left = duplicate_insensitive
+        || side_unique_on_join(
+            join.left.schema(),
+            join.on.iter().map(|(left, _)| left),
+            join.null_equality,
+        );
+    if visible_left.is_empty() && can_remove_left {
+        return JoinType::RightSemi;
+    }
+
+    JoinType::Inner
+}
+
+fn add_join_condition_columns(
+    join: &Join,
+    left_live: &mut LiveColumns,
+    right_live: &mut LiveColumns,
+) -> Result<()> {
+    extend_live_columns(
+        left_live,
+        join.on.iter().map(|(l, _)| l),
+        join.left.schema(),
+    )?;
+    extend_live_columns(
+        right_live,
+        join.on.iter().map(|(_, r)| r),
+        join.right.schema(),
+    )?;
+
+    if let Some(filter) = &join.filter {
+        extend_live_columns(left_live, [filter], join.left.schema())?;
+        extend_live_columns(right_live, [filter], join.right.schema())?;
+    }
+
+    Ok(())
+}
+
+fn split_join_output_columns(
+    join: &Join,
+    live: &LiveColumns,
+) -> (LiveColumns, LiveColumns) {
+    let left_len = join.left.schema().fields().len();
+    match join.join_type {
+        JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
{
+            split_columns_at(live, left_len)
+        }
+        // A semi/anti/mark join outputs only the surviving side's columns, 
with
+        // the same index space, so `live` passes straight through to that 
side.
+        JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
+            (live.clone(), LiveColumns::new())
+        }
+        JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
+            (LiveColumns::new(), live.clone())
+        }
+    }
+}
+
+fn split_columns_at(live: &LiveColumns, left_len: usize) -> (LiveColumns, 
LiveColumns) {
+    let mut left = LiveColumns::new();
+    let mut right = LiveColumns::new();
+    for idx in live {
+        if *idx < left_len {
+            left.insert(*idx);
+        } else {
+            right.insert(*idx - left_len);
+        }
+    }
+    (left, right)
+}
+
+fn side_unique_on_join<'a>(

Review Comment:
   Woudl this be better / easier to discover / reuse as a method on Join?



##########
datafusion/optimizer/src/eliminate_join.rs:
##########
@@ -42,44 +92,485 @@ impl OptimizerRule for EliminateJoin {
         "eliminate_join"
     }
 
-    fn apply_order(&self) -> Option<ApplyOrder> {
-        Some(ApplyOrder::TopDown)
-    }
-
     fn rewrite(
         &self,
         plan: LogicalPlan,
         _config: &dyn OptimizerConfig,
     ) -> Result<Transformed<LogicalPlan>> {
-        match plan {
-            LogicalPlan::Join(join) if join.join_type == Inner && 
join.on.is_empty() => {
-                match join.filter {
-                    Some(Expr::Literal(ScalarValue::Boolean(Some(false)), _)) 
=> Ok(
-                        
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
-                            produce_one_row: false,
-                            schema: join.schema,
-                        })),
-                    ),
-                    _ => Ok(Transformed::no(LogicalPlan::Join(join))),
+        let live = all_columns(plan.schema());
+        rewrite_subtree(plan, live, false)
+    }
+}
+
+/// Rewrites `plan` and everything below it, including joins nested inside
+/// subquery expressions.
+///
+/// [`rewrite_node`] handles the node itself and recurses into its plan
+/// children; this wrapper additionally descends into the node's own subquery
+/// expressions.  Each subquery is seeded as a fresh root, since its columns 
are
+/// independent of the enclosing plan's `live` set.
+fn rewrite_subtree(
+    plan: LogicalPlan,
+    live: LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    rewrite_node(plan, live, duplicate_insensitive)?.transform_data(|plan| {
+        plan.map_subqueries(|subquery| {
+            let live = all_columns(subquery.schema());
+            rewrite_subtree(subquery, live, false)
+        })
+    })
+}
+
+fn rewrite_node(
+    plan: LogicalPlan,
+    live: LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    match plan {
+        // The only arm that rewrites a join; the rest just thread context 
down to one.
+        LogicalPlan::Join(join) => rewrite_join(join, &live, 
duplicate_insensitive),
+        LogicalPlan::Projection(Projection {
+            expr,
+            input,
+            schema,
+            ..
+        }) => {
+            // Narrows `live` to the columns the projection's expressions 
reference.
+            let child_live = live_columns(&expr, input.schema())?;
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                    expr, input, schema,
+                )?))
+            })
+        }
+        LogicalPlan::Filter(Filter {
+            predicate, input, ..
+        }) => {
+            // Adds the predicate's columns to `live` (a side used only by the 
filter stays live).
+            let mut child_live = live;
+            extend_live_columns(&mut child_live, [&predicate], 
input.schema())?;
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Filter(Filter::new(predicate, input)))
+            })
+        }
+        LogicalPlan::Aggregate(Aggregate {
+            input,
+            group_expr,
+            aggr_expr,
+            schema,
+            ..
+        }) => {
+            // Narrows `live` to the grouping and aggregate expressions' 
columns.
+            let child_live =
+                live_columns(group_expr.iter().chain(&aggr_expr), 
input.schema())?;
+
+            // A grouping aggregate with no aggregate functions (`GROUP BY` 
with
+            // an empty `aggr_expr`) only observes which group-key values 
exist,
+            // not how many rows produced them, so its input is duplicate-
+            // insensitive.
+            let child_duplicate_insensitive =
+                !group_expr.is_empty() && aggr_expr.is_empty();
+
+            rewrite_single_input(
+                input,
+                child_live,
+                child_duplicate_insensitive,
+                |input| {
+                    Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
+                        input, group_expr, aggr_expr, schema,
+                    )?))
+                },
+            )
+        }
+        LogicalPlan::Distinct(Distinct::All(input)) => {
+            // `SELECT DISTINCT *` is equivalent to a no-aggregate `GROUP BY`
+            // over every input column, so the input is duplicate-insensitive,
+            // but every column is part of the dedup key.
+            let child_live = all_columns(input.schema());
+            rewrite_single_input(input, child_live, true, |input| {
+                Ok(LogicalPlan::Distinct(Distinct::All(input)))
+            })
+        }
+        LogicalPlan::Distinct(Distinct::On(DistinctOn {
+            on_expr,
+            select_expr,
+            sort_expr,
+            input,
+            schema,
+        })) => {
+            // `DISTINCT ON (on) select [ORDER BY sort]` is a no-aggregate
+            // `GROUP BY` on the columns it reads, so its input is duplicate-
+            // insensitive; the live columns are exactly those of the
+            // ON/SELECT/ORDER BY expressions.
+            let mut child_live =
+                live_columns(on_expr.iter().chain(&select_expr), 
input.schema())?;
+            if let Some(sort_expr) = &sort_expr {
+                extend_live_columns(
+                    &mut child_live,
+                    sort_expr.iter().map(|s| &s.expr),
+                    input.schema(),
+                )?;
+            }
+
+            rewrite_single_input(input, child_live, true, |input| {
+                Ok(LogicalPlan::Distinct(Distinct::On(DistinctOn {
+                    on_expr,
+                    select_expr,
+                    sort_expr,
+                    input,
+                    schema,
+                })))
+            })
+        }
+        LogicalPlan::Sort(Sort { expr, input, fetch }) => {
+            // Adds the sort-key columns to `live`.
+            let mut child_live = live;
+            extend_live_columns(
+                &mut child_live,
+                expr.iter().map(|s| &s.expr),
+                input.schema(),
+            )?;
+
+            // A `fetch` (top-N) makes the row count observable, so duplicate-
+            // insensitivity does not survive past it.
+            let child_duplicate_insensitive = duplicate_insensitive && 
fetch.is_none();
+            rewrite_single_input(
+                input,
+                child_live,
+                child_duplicate_insensitive,
+                |input| Ok(LogicalPlan::Sort(Sort { expr, input, fetch })),
+            )
+        }
+        LogicalPlan::Limit(Limit { skip, fetch, input }) => {
+            // LIMIT makes the row count observable, so it clears 
duplicate-insensitivity.
+            rewrite_single_input(input, live, false, |input| {
+                Ok(LogicalPlan::Limit(Limit { skip, fetch, input }))
+            })
+        }
+        LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
+            // Re-aliases columns 1:1, so `live` and duplicate-sensitivity 
pass through unchanged.
+            rewrite_single_input(input, live, duplicate_insensitive, |input| {
+                Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
+                    input, alias,
+                )?))
+            })
+        }
+        LogicalPlan::Repartition(Repartition {
+            input,
+            partitioning_scheme,
+        }) => {
+            // Adds any partitioning-key columns to `live`; 
duplicate-sensitivity is unchanged.
+            let mut child_live = live;
+            match &partitioning_scheme {
+                Partitioning::Hash(exprs, _) | 
Partitioning::DistributeBy(exprs) => {
+                    extend_live_columns(&mut child_live, exprs, 
input.schema())?;
                 }
+                Partitioning::RoundRobinBatch(_) => {}
             }
-            _ => Ok(Transformed::no(plan)),
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Repartition(Repartition {
+                    input,
+                    partitioning_scheme,
+                }))
+            })
         }
+        // Conservatively treat any other plan node as a fresh root, since we 
are
+        // not sure of its semantics with respect to duplicates or live 
columns.
+        _ => plan.map_children(|child| {
+            let live = all_columns(child.schema());
+            rewrite_subtree(child, live, false)
+        }),
     }
+}
+
+/// Recurses into a single-input node's child, threading `child_live` and
+/// `duplicate_insensitive` down, then rebuilds the node from the (possibly
+/// rewritten) child via `rebuild`. The child's `Transformed` flag is 
preserved,
+/// so the node is reported as changed exactly when its child changed.
+fn rewrite_single_input<F>(
+    input: Arc<LogicalPlan>,
+    child_live: LiveColumns,
+    duplicate_insensitive: bool,
+    rebuild: F,
+) -> Result<Transformed<LogicalPlan>>
+where
+    F: FnOnce(Arc<LogicalPlan>) -> Result<LogicalPlan>,
+{
+    rewrite_subtree(
+        Arc::unwrap_or_clone(input),
+        child_live,
+        duplicate_insensitive,
+    )?
+    .map_data(|input| rebuild(Arc::new(input)))
+}
+
+fn rewrite_join(
+    join: Join,
+    live: &LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    if join.join_type == JoinType::Inner
+        && join.on.is_empty()
+        && matches!(
+            join.filter.as_ref(),
+            Some(Expr::Literal(ScalarValue::Boolean(Some(false)), _))
+        )
+    {
+        return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
+            EmptyRelation {
+                produce_one_row: false,
+                schema: join.schema,
+            },
+        )));
+    }
+
+    let (visible_left, visible_right) = split_join_output_columns(&join, live);
+
+    let rewritten_join_type =
+        rewritten_join_type(&join, &visible_left, &visible_right, 
duplicate_insensitive);
+
+    let (mut left_live, mut right_live) = match rewritten_join_type {
+        JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
+            (visible_left, LiveColumns::new())
+        }
+        JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
+            (LiveColumns::new(), visible_right)
+        }
+        _ => (visible_left, visible_right),
+    };
+
+    add_join_condition_columns(&join, &mut left_live, &mut right_live)?;
+
+    let (left_dup_insensitive, right_dup_insensitive) =
+        child_duplicate_insensitivity(rewritten_join_type, 
duplicate_insensitive);
 
-    fn supports_rewrite(&self) -> bool {
-        true
+    let left = rewrite_subtree(
+        Arc::unwrap_or_clone(join.left),
+        left_live,
+        left_dup_insensitive,
+    )?;
+    let right = rewrite_subtree(
+        Arc::unwrap_or_clone(join.right),
+        right_live,
+        right_dup_insensitive,
+    )?;
+
+    let changed =
+        left.transformed || right.transformed || rewritten_join_type != 
join.join_type;
+    let left = Arc::new(left.data);
+    let right = Arc::new(right.data);
+
+    if changed {
+        // The join type or an input changed, so the output schema may have
+        // narrowed; recompute it via `try_new`.
+        Ok(Transformed::yes(LogicalPlan::Join(Join::try_new(
+            left,
+            right,
+            join.on,
+            join.filter,
+            rewritten_join_type,
+            join.join_constraint,
+            join.null_equality,
+            join.null_aware,
+        )?)))
+    } else {
+        // Nothing changed; reassemble the join reusing its existing schema 
rather
+        // than recomputing it.
+        Ok(Transformed::no(LogicalPlan::Join(Join {
+            left,
+            right,
+            on: join.on,
+            filter: join.filter,
+            join_type: join.join_type,
+            join_constraint: join.join_constraint,
+            schema: join.schema,
+            null_equality: join.null_equality,
+            null_aware: join.null_aware,
+        })))
+    }
+}
+
+/// Returns which join inputs can safely ignore duplicate rows from their own
+/// descendants. For semi/anti/mark joins, duplicates from the existence side 
do
+/// not change the result even when the parent itself is duplicate-sensitive.
+fn child_duplicate_insensitivity(
+    join_type: JoinType,
+    duplicate_insensitive: bool,
+) -> (bool, bool) {
+    match join_type {
+        JoinType::Inner => (duplicate_insensitive, duplicate_insensitive),
+        JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
+            (duplicate_insensitive, true)
+        }
+        JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
+            (true, duplicate_insensitive)
+        }
+        JoinType::Left | JoinType::Right | JoinType::Full => (false, false),
     }
 }
 
+/// Rewrites an inner join to a semi join when the removed side has no
+/// parent-visible columns and either the parent ignores duplicate output rows 
or
+/// the removed side is unique on the join keys.
+fn rewritten_join_type(
+    join: &Join,
+    visible_left: &LiveColumns,
+    visible_right: &LiveColumns,
+    duplicate_insensitive: bool,
+) -> JoinType {
+    if join.join_type != JoinType::Inner || join.on.is_empty() {
+        return join.join_type;
+    }
+
+    let can_remove_right = duplicate_insensitive
+        || side_unique_on_join(
+            join.right.schema(),
+            join.on.iter().map(|(_, right)| right),
+            join.null_equality,
+        );
+    if visible_right.is_empty() && can_remove_right {
+        return JoinType::LeftSemi;
+    }
+
+    let can_remove_left = duplicate_insensitive
+        || side_unique_on_join(
+            join.left.schema(),
+            join.on.iter().map(|(left, _)| left),
+            join.null_equality,
+        );
+    if visible_left.is_empty() && can_remove_left {
+        return JoinType::RightSemi;
+    }
+
+    JoinType::Inner
+}
+
+fn add_join_condition_columns(
+    join: &Join,
+    left_live: &mut LiveColumns,
+    right_live: &mut LiveColumns,
+) -> Result<()> {
+    extend_live_columns(
+        left_live,
+        join.on.iter().map(|(l, _)| l),
+        join.left.schema(),
+    )?;
+    extend_live_columns(
+        right_live,
+        join.on.iter().map(|(_, r)| r),
+        join.right.schema(),
+    )?;
+
+    if let Some(filter) = &join.filter {
+        extend_live_columns(left_live, [filter], join.left.schema())?;
+        extend_live_columns(right_live, [filter], join.right.schema())?;
+    }
+
+    Ok(())
+}
+
+fn split_join_output_columns(
+    join: &Join,
+    live: &LiveColumns,
+) -> (LiveColumns, LiveColumns) {
+    let left_len = join.left.schema().fields().len();
+    match join.join_type {
+        JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
{
+            split_columns_at(live, left_len)
+        }
+        // A semi/anti/mark join outputs only the surviving side's columns, 
with
+        // the same index space, so `live` passes straight through to that 
side.
+        JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
+            (live.clone(), LiveColumns::new())
+        }
+        JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
+            (LiveColumns::new(), live.clone())
+        }
+    }
+}
+
+fn split_columns_at(live: &LiveColumns, left_len: usize) -> (LiveColumns, 
LiveColumns) {
+    let mut left = LiveColumns::new();
+    let mut right = LiveColumns::new();
+    for idx in live {
+        if *idx < left_len {
+            left.insert(*idx);
+        } else {
+            right.insert(*idx - left_len);
+        }
+    }
+    (left, right)
+}
+
+fn side_unique_on_join<'a>(
+    schema: &DFSchema,
+    join_exprs: impl Iterator<Item = &'a Expr>,
+    null_equality: NullEquality,
+) -> bool {
+    let join_key_indices = join_exprs
+        .filter_map(|expr| match expr {
+            Expr::Alias(alias) => alias.expr.as_ref().try_as_col(),
+            _ => expr.try_as_col(),
+        })
+        .filter_map(|column| schema.maybe_index_of_column(column))
+        .collect::<Vec<usize>>();
+
+    schema.functional_dependencies().iter().any(|dependency| {
+        dependency.mode == Dependency::Single
+            && (!dependency.nullable || null_equality == 
NullEquality::NullEqualsNothing)
+            && dependency
+                .source_indices
+                .iter()
+                .all(|idx| join_key_indices.contains(idx))
+    })
+}
+
+fn all_columns(schema: &DFSchema) -> LiveColumns {
+    (0..schema.fields().len()).collect()
+}
+
+/// The columns of `schema` referenced by any of `exprs`.
+fn live_columns<'a>(

Review Comment:
   Could be `LoveColumns::try_new()` potentially, rather than a free function



##########
datafusion/optimizer/src/eliminate_join.rs:
##########
@@ -42,44 +92,485 @@ impl OptimizerRule for EliminateJoin {
         "eliminate_join"
     }
 
-    fn apply_order(&self) -> Option<ApplyOrder> {
-        Some(ApplyOrder::TopDown)
-    }
-
     fn rewrite(
         &self,
         plan: LogicalPlan,
         _config: &dyn OptimizerConfig,
     ) -> Result<Transformed<LogicalPlan>> {
-        match plan {
-            LogicalPlan::Join(join) if join.join_type == Inner && 
join.on.is_empty() => {
-                match join.filter {
-                    Some(Expr::Literal(ScalarValue::Boolean(Some(false)), _)) 
=> Ok(
-                        
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
-                            produce_one_row: false,
-                            schema: join.schema,
-                        })),
-                    ),
-                    _ => Ok(Transformed::no(LogicalPlan::Join(join))),
+        let live = all_columns(plan.schema());
+        rewrite_subtree(plan, live, false)
+    }
+}
+
+/// Rewrites `plan` and everything below it, including joins nested inside
+/// subquery expressions.
+///
+/// [`rewrite_node`] handles the node itself and recurses into its plan
+/// children; this wrapper additionally descends into the node's own subquery
+/// expressions.  Each subquery is seeded as a fresh root, since its columns 
are
+/// independent of the enclosing plan's `live` set.
+fn rewrite_subtree(
+    plan: LogicalPlan,
+    live: LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    rewrite_node(plan, live, duplicate_insensitive)?.transform_data(|plan| {
+        plan.map_subqueries(|subquery| {
+            let live = all_columns(subquery.schema());
+            rewrite_subtree(subquery, live, false)
+        })
+    })
+}
+
+fn rewrite_node(
+    plan: LogicalPlan,
+    live: LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    match plan {
+        // The only arm that rewrites a join; the rest just thread context 
down to one.
+        LogicalPlan::Join(join) => rewrite_join(join, &live, 
duplicate_insensitive),
+        LogicalPlan::Projection(Projection {
+            expr,
+            input,
+            schema,
+            ..
+        }) => {
+            // Narrows `live` to the columns the projection's expressions 
reference.
+            let child_live = live_columns(&expr, input.schema())?;
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                    expr, input, schema,
+                )?))
+            })
+        }
+        LogicalPlan::Filter(Filter {
+            predicate, input, ..
+        }) => {
+            // Adds the predicate's columns to `live` (a side used only by the 
filter stays live).
+            let mut child_live = live;
+            extend_live_columns(&mut child_live, [&predicate], 
input.schema())?;
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Filter(Filter::new(predicate, input)))
+            })
+        }
+        LogicalPlan::Aggregate(Aggregate {
+            input,
+            group_expr,
+            aggr_expr,
+            schema,
+            ..
+        }) => {
+            // Narrows `live` to the grouping and aggregate expressions' 
columns.
+            let child_live =
+                live_columns(group_expr.iter().chain(&aggr_expr), 
input.schema())?;
+
+            // A grouping aggregate with no aggregate functions (`GROUP BY` 
with
+            // an empty `aggr_expr`) only observes which group-key values 
exist,
+            // not how many rows produced them, so its input is duplicate-
+            // insensitive.
+            let child_duplicate_insensitive =
+                !group_expr.is_empty() && aggr_expr.is_empty();
+
+            rewrite_single_input(
+                input,
+                child_live,
+                child_duplicate_insensitive,
+                |input| {
+                    Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
+                        input, group_expr, aggr_expr, schema,
+                    )?))
+                },
+            )
+        }
+        LogicalPlan::Distinct(Distinct::All(input)) => {
+            // `SELECT DISTINCT *` is equivalent to a no-aggregate `GROUP BY`
+            // over every input column, so the input is duplicate-insensitive,
+            // but every column is part of the dedup key.
+            let child_live = all_columns(input.schema());
+            rewrite_single_input(input, child_live, true, |input| {
+                Ok(LogicalPlan::Distinct(Distinct::All(input)))
+            })
+        }
+        LogicalPlan::Distinct(Distinct::On(DistinctOn {
+            on_expr,
+            select_expr,
+            sort_expr,
+            input,
+            schema,
+        })) => {
+            // `DISTINCT ON (on) select [ORDER BY sort]` is a no-aggregate
+            // `GROUP BY` on the columns it reads, so its input is duplicate-
+            // insensitive; the live columns are exactly those of the
+            // ON/SELECT/ORDER BY expressions.
+            let mut child_live =
+                live_columns(on_expr.iter().chain(&select_expr), 
input.schema())?;
+            if let Some(sort_expr) = &sort_expr {
+                extend_live_columns(
+                    &mut child_live,
+                    sort_expr.iter().map(|s| &s.expr),
+                    input.schema(),
+                )?;
+            }
+
+            rewrite_single_input(input, child_live, true, |input| {
+                Ok(LogicalPlan::Distinct(Distinct::On(DistinctOn {
+                    on_expr,
+                    select_expr,
+                    sort_expr,
+                    input,
+                    schema,
+                })))
+            })
+        }
+        LogicalPlan::Sort(Sort { expr, input, fetch }) => {
+            // Adds the sort-key columns to `live`.
+            let mut child_live = live;
+            extend_live_columns(
+                &mut child_live,
+                expr.iter().map(|s| &s.expr),
+                input.schema(),
+            )?;
+
+            // A `fetch` (top-N) makes the row count observable, so duplicate-
+            // insensitivity does not survive past it.
+            let child_duplicate_insensitive = duplicate_insensitive && 
fetch.is_none();
+            rewrite_single_input(
+                input,
+                child_live,
+                child_duplicate_insensitive,
+                |input| Ok(LogicalPlan::Sort(Sort { expr, input, fetch })),
+            )
+        }
+        LogicalPlan::Limit(Limit { skip, fetch, input }) => {
+            // LIMIT makes the row count observable, so it clears 
duplicate-insensitivity.
+            rewrite_single_input(input, live, false, |input| {
+                Ok(LogicalPlan::Limit(Limit { skip, fetch, input }))
+            })
+        }
+        LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
+            // Re-aliases columns 1:1, so `live` and duplicate-sensitivity 
pass through unchanged.
+            rewrite_single_input(input, live, duplicate_insensitive, |input| {
+                Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
+                    input, alias,
+                )?))
+            })
+        }
+        LogicalPlan::Repartition(Repartition {
+            input,
+            partitioning_scheme,
+        }) => {
+            // Adds any partitioning-key columns to `live`; 
duplicate-sensitivity is unchanged.
+            let mut child_live = live;
+            match &partitioning_scheme {
+                Partitioning::Hash(exprs, _) | 
Partitioning::DistributeBy(exprs) => {
+                    extend_live_columns(&mut child_live, exprs, 
input.schema())?;
                 }
+                Partitioning::RoundRobinBatch(_) => {}
             }
-            _ => Ok(Transformed::no(plan)),
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Repartition(Repartition {
+                    input,
+                    partitioning_scheme,
+                }))
+            })
         }
+        // Conservatively treat any other plan node as a fresh root, since we 
are
+        // not sure of its semantics with respect to duplicates or live 
columns.
+        _ => plan.map_children(|child| {
+            let live = all_columns(child.schema());
+            rewrite_subtree(child, live, false)
+        }),
     }
+}
+
+/// Recurses into a single-input node's child, threading `child_live` and
+/// `duplicate_insensitive` down, then rebuilds the node from the (possibly
+/// rewritten) child via `rebuild`. The child's `Transformed` flag is 
preserved,
+/// so the node is reported as changed exactly when its child changed.
+fn rewrite_single_input<F>(
+    input: Arc<LogicalPlan>,
+    child_live: LiveColumns,
+    duplicate_insensitive: bool,
+    rebuild: F,
+) -> Result<Transformed<LogicalPlan>>
+where
+    F: FnOnce(Arc<LogicalPlan>) -> Result<LogicalPlan>,
+{
+    rewrite_subtree(
+        Arc::unwrap_or_clone(input),
+        child_live,
+        duplicate_insensitive,
+    )?
+    .map_data(|input| rebuild(Arc::new(input)))
+}
+
+fn rewrite_join(
+    join: Join,
+    live: &LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    if join.join_type == JoinType::Inner
+        && join.on.is_empty()
+        && matches!(
+            join.filter.as_ref(),
+            Some(Expr::Literal(ScalarValue::Boolean(Some(false)), _))
+        )
+    {
+        return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
+            EmptyRelation {
+                produce_one_row: false,
+                schema: join.schema,
+            },
+        )));
+    }
+
+    let (visible_left, visible_right) = split_join_output_columns(&join, live);
+
+    let rewritten_join_type =
+        rewritten_join_type(&join, &visible_left, &visible_right, 
duplicate_insensitive);
+
+    let (mut left_live, mut right_live) = match rewritten_join_type {
+        JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
+            (visible_left, LiveColumns::new())
+        }
+        JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
+            (LiveColumns::new(), visible_right)
+        }
+        _ => (visible_left, visible_right),
+    };
+
+    add_join_condition_columns(&join, &mut left_live, &mut right_live)?;
+
+    let (left_dup_insensitive, right_dup_insensitive) =
+        child_duplicate_insensitivity(rewritten_join_type, 
duplicate_insensitive);
 
-    fn supports_rewrite(&self) -> bool {
-        true
+    let left = rewrite_subtree(
+        Arc::unwrap_or_clone(join.left),
+        left_live,
+        left_dup_insensitive,
+    )?;
+    let right = rewrite_subtree(
+        Arc::unwrap_or_clone(join.right),
+        right_live,
+        right_dup_insensitive,
+    )?;
+
+    let changed =
+        left.transformed || right.transformed || rewritten_join_type != 
join.join_type;
+    let left = Arc::new(left.data);
+    let right = Arc::new(right.data);
+
+    if changed {
+        // The join type or an input changed, so the output schema may have
+        // narrowed; recompute it via `try_new`.
+        Ok(Transformed::yes(LogicalPlan::Join(Join::try_new(
+            left,
+            right,
+            join.on,
+            join.filter,
+            rewritten_join_type,
+            join.join_constraint,
+            join.null_equality,
+            join.null_aware,
+        )?)))
+    } else {
+        // Nothing changed; reassemble the join reusing its existing schema 
rather
+        // than recomputing it.
+        Ok(Transformed::no(LogicalPlan::Join(Join {
+            left,
+            right,
+            on: join.on,
+            filter: join.filter,
+            join_type: join.join_type,
+            join_constraint: join.join_constraint,
+            schema: join.schema,
+            null_equality: join.null_equality,
+            null_aware: join.null_aware,
+        })))
+    }
+}
+
+/// Returns which join inputs can safely ignore duplicate rows from their own
+/// descendants. For semi/anti/mark joins, duplicates from the existence side 
do
+/// not change the result even when the parent itself is duplicate-sensitive.
+fn child_duplicate_insensitivity(
+    join_type: JoinType,
+    duplicate_insensitive: bool,
+) -> (bool, bool) {
+    match join_type {
+        JoinType::Inner => (duplicate_insensitive, duplicate_insensitive),
+        JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
+            (duplicate_insensitive, true)
+        }
+        JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
+            (true, duplicate_insensitive)
+        }
+        JoinType::Left | JoinType::Right | JoinType::Full => (false, false),
     }
 }
 
+/// Rewrites an inner join to a semi join when the removed side has no
+/// parent-visible columns and either the parent ignores duplicate output rows 
or
+/// the removed side is unique on the join keys.
+fn rewritten_join_type(
+    join: &Join,
+    visible_left: &LiveColumns,
+    visible_right: &LiveColumns,
+    duplicate_insensitive: bool,
+) -> JoinType {
+    if join.join_type != JoinType::Inner || join.on.is_empty() {
+        return join.join_type;
+    }
+
+    let can_remove_right = duplicate_insensitive
+        || side_unique_on_join(
+            join.right.schema(),
+            join.on.iter().map(|(_, right)| right),
+            join.null_equality,
+        );
+    if visible_right.is_empty() && can_remove_right {
+        return JoinType::LeftSemi;
+    }
+
+    let can_remove_left = duplicate_insensitive
+        || side_unique_on_join(
+            join.left.schema(),
+            join.on.iter().map(|(left, _)| left),
+            join.null_equality,
+        );
+    if visible_left.is_empty() && can_remove_left {
+        return JoinType::RightSemi;
+    }
+
+    JoinType::Inner
+}
+
+fn add_join_condition_columns(
+    join: &Join,
+    left_live: &mut LiveColumns,
+    right_live: &mut LiveColumns,
+) -> Result<()> {
+    extend_live_columns(
+        left_live,
+        join.on.iter().map(|(l, _)| l),
+        join.left.schema(),
+    )?;
+    extend_live_columns(
+        right_live,
+        join.on.iter().map(|(_, r)| r),
+        join.right.schema(),
+    )?;
+
+    if let Some(filter) = &join.filter {
+        extend_live_columns(left_live, [filter], join.left.schema())?;
+        extend_live_columns(right_live, [filter], join.right.schema())?;
+    }
+
+    Ok(())
+}
+
+fn split_join_output_columns(
+    join: &Join,
+    live: &LiveColumns,
+) -> (LiveColumns, LiveColumns) {
+    let left_len = join.left.schema().fields().len();
+    match join.join_type {
+        JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
{
+            split_columns_at(live, left_len)
+        }
+        // A semi/anti/mark join outputs only the surviving side's columns, 
with
+        // the same index space, so `live` passes straight through to that 
side.
+        JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
+            (live.clone(), LiveColumns::new())
+        }
+        JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
+            (LiveColumns::new(), live.clone())
+        }
+    }
+}
+
+fn split_columns_at(live: &LiveColumns, left_len: usize) -> (LiveColumns, 
LiveColumns) {
+    let mut left = LiveColumns::new();
+    let mut right = LiveColumns::new();
+    for idx in live {
+        if *idx < left_len {
+            left.insert(*idx);
+        } else {
+            right.insert(*idx - left_len);
+        }
+    }
+    (left, right)
+}
+
+fn side_unique_on_join<'a>(
+    schema: &DFSchema,
+    join_exprs: impl Iterator<Item = &'a Expr>,
+    null_equality: NullEquality,
+) -> bool {
+    let join_key_indices = join_exprs
+        .filter_map(|expr| match expr {
+            Expr::Alias(alias) => alias.expr.as_ref().try_as_col(),
+            _ => expr.try_as_col(),
+        })
+        .filter_map(|column| schema.maybe_index_of_column(column))
+        .collect::<Vec<usize>>();
+
+    schema.functional_dependencies().iter().any(|dependency| {
+        dependency.mode == Dependency::Single
+            && (!dependency.nullable || null_equality == 
NullEquality::NullEqualsNothing)
+            && dependency
+                .source_indices
+                .iter()
+                .all(|idx| join_key_indices.contains(idx))
+    })
+}
+
+fn all_columns(schema: &DFSchema) -> LiveColumns {
+    (0..schema.fields().len()).collect()
+}
+
+/// The columns of `schema` referenced by any of `exprs`.
+fn live_columns<'a>(
+    exprs: impl IntoIterator<Item = &'a Expr>,
+    schema: &DFSchema,
+) -> Result<LiveColumns> {
+    let mut live = LiveColumns::new();
+    extend_live_columns(&mut live, exprs, schema)?;
+    Ok(live)
+}
+
+/// Inserts into `live` the index, within `schema`, of every column referenced
+/// by any of `exprs`.
+fn extend_live_columns<'a>(

Review Comment:
   Another good candidate for a `LIveColumns` method perhaps



##########
datafusion/optimizer/src/eliminate_join.rs:
##########
@@ -42,44 +92,485 @@ impl OptimizerRule for EliminateJoin {
         "eliminate_join"
     }
 
-    fn apply_order(&self) -> Option<ApplyOrder> {
-        Some(ApplyOrder::TopDown)
-    }
-
     fn rewrite(
         &self,
         plan: LogicalPlan,
         _config: &dyn OptimizerConfig,
     ) -> Result<Transformed<LogicalPlan>> {
-        match plan {
-            LogicalPlan::Join(join) if join.join_type == Inner && 
join.on.is_empty() => {
-                match join.filter {
-                    Some(Expr::Literal(ScalarValue::Boolean(Some(false)), _)) 
=> Ok(
-                        
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
-                            produce_one_row: false,
-                            schema: join.schema,
-                        })),
-                    ),
-                    _ => Ok(Transformed::no(LogicalPlan::Join(join))),
+        let live = all_columns(plan.schema());
+        rewrite_subtree(plan, live, false)
+    }
+}
+
+/// Rewrites `plan` and everything below it, including joins nested inside
+/// subquery expressions.
+///
+/// [`rewrite_node`] handles the node itself and recurses into its plan
+/// children; this wrapper additionally descends into the node's own subquery
+/// expressions.  Each subquery is seeded as a fresh root, since its columns 
are
+/// independent of the enclosing plan's `live` set.
+fn rewrite_subtree(
+    plan: LogicalPlan,
+    live: LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    rewrite_node(plan, live, duplicate_insensitive)?.transform_data(|plan| {
+        plan.map_subqueries(|subquery| {
+            let live = all_columns(subquery.schema());
+            rewrite_subtree(subquery, live, false)
+        })
+    })
+}
+
+fn rewrite_node(
+    plan: LogicalPlan,
+    live: LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    match plan {
+        // The only arm that rewrites a join; the rest just thread context 
down to one.
+        LogicalPlan::Join(join) => rewrite_join(join, &live, 
duplicate_insensitive),
+        LogicalPlan::Projection(Projection {
+            expr,
+            input,
+            schema,
+            ..
+        }) => {
+            // Narrows `live` to the columns the projection's expressions 
reference.
+            let child_live = live_columns(&expr, input.schema())?;
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                    expr, input, schema,
+                )?))
+            })
+        }
+        LogicalPlan::Filter(Filter {
+            predicate, input, ..
+        }) => {
+            // Adds the predicate's columns to `live` (a side used only by the 
filter stays live).
+            let mut child_live = live;
+            extend_live_columns(&mut child_live, [&predicate], 
input.schema())?;
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Filter(Filter::new(predicate, input)))
+            })
+        }
+        LogicalPlan::Aggregate(Aggregate {
+            input,
+            group_expr,
+            aggr_expr,
+            schema,
+            ..
+        }) => {
+            // Narrows `live` to the grouping and aggregate expressions' 
columns.
+            let child_live =
+                live_columns(group_expr.iter().chain(&aggr_expr), 
input.schema())?;
+
+            // A grouping aggregate with no aggregate functions (`GROUP BY` 
with
+            // an empty `aggr_expr`) only observes which group-key values 
exist,
+            // not how many rows produced them, so its input is duplicate-
+            // insensitive.
+            let child_duplicate_insensitive =
+                !group_expr.is_empty() && aggr_expr.is_empty();
+
+            rewrite_single_input(
+                input,
+                child_live,
+                child_duplicate_insensitive,
+                |input| {
+                    Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
+                        input, group_expr, aggr_expr, schema,
+                    )?))
+                },
+            )
+        }
+        LogicalPlan::Distinct(Distinct::All(input)) => {

Review Comment:
   I think some of this code is not covered by tests
   
   ```shell
   cargo llvm-cov --html  test --profile=ci -p datafusion-optimizer
   cargo llvm-cov --html  test --profile=ci --test sqllogictests
   ```
   
   <img width="1158" height="982" alt="Image" 
src="https://github.com/user-attachments/assets/71bb3a23-b968-4a69-8836-ad1a68b42066";
 />



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to