neilconway commented on code in PR #22652:
URL: https://github.com/apache/datafusion/pull/22652#discussion_r3367428003


##########
datafusion/optimizer/src/eliminate_join.rs:
##########
@@ -42,44 +92,485 @@ impl OptimizerRule for EliminateJoin {
         "eliminate_join"
     }
 
-    fn apply_order(&self) -> Option<ApplyOrder> {
-        Some(ApplyOrder::TopDown)
-    }
-
     fn rewrite(
         &self,
         plan: LogicalPlan,
         _config: &dyn OptimizerConfig,
     ) -> Result<Transformed<LogicalPlan>> {
-        match plan {
-            LogicalPlan::Join(join) if join.join_type == Inner && 
join.on.is_empty() => {
-                match join.filter {
-                    Some(Expr::Literal(ScalarValue::Boolean(Some(false)), _)) 
=> Ok(
-                        
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
-                            produce_one_row: false,
-                            schema: join.schema,
-                        })),
-                    ),
-                    _ => Ok(Transformed::no(LogicalPlan::Join(join))),
+        let live = all_columns(plan.schema());
+        rewrite_subtree(plan, live, false)
+    }
+}
+
+/// Rewrites `plan` and everything below it, including joins nested inside
+/// subquery expressions.
+///
+/// [`rewrite_node`] handles the node itself and recurses into its plan
+/// children; this wrapper additionally descends into the node's own subquery
+/// expressions.  Each subquery is seeded as a fresh root, since its columns 
are
+/// independent of the enclosing plan's `live` set.
+fn rewrite_subtree(
+    plan: LogicalPlan,
+    live: LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    rewrite_node(plan, live, duplicate_insensitive)?.transform_data(|plan| {
+        plan.map_subqueries(|subquery| {
+            let live = all_columns(subquery.schema());
+            rewrite_subtree(subquery, live, false)
+        })
+    })
+}
+
+fn rewrite_node(
+    plan: LogicalPlan,
+    live: LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    match plan {
+        // The only arm that rewrites a join; the rest just thread context 
down to one.
+        LogicalPlan::Join(join) => rewrite_join(join, &live, 
duplicate_insensitive),
+        LogicalPlan::Projection(Projection {
+            expr,
+            input,
+            schema,
+            ..
+        }) => {
+            // Narrows `live` to the columns the projection's expressions 
reference.
+            let child_live = live_columns(&expr, input.schema())?;
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+                    expr, input, schema,
+                )?))
+            })
+        }
+        LogicalPlan::Filter(Filter {
+            predicate, input, ..
+        }) => {
+            // Adds the predicate's columns to `live` (a side used only by the 
filter stays live).
+            let mut child_live = live;
+            extend_live_columns(&mut child_live, [&predicate], 
input.schema())?;
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Filter(Filter::new(predicate, input)))
+            })
+        }
+        LogicalPlan::Aggregate(Aggregate {
+            input,
+            group_expr,
+            aggr_expr,
+            schema,
+            ..
+        }) => {
+            // Narrows `live` to the grouping and aggregate expressions' 
columns.
+            let child_live =
+                live_columns(group_expr.iter().chain(&aggr_expr), 
input.schema())?;
+
+            // A grouping aggregate with no aggregate functions (`GROUP BY` 
with
+            // an empty `aggr_expr`) only observes which group-key values 
exist,
+            // not how many rows produced them, so its input is duplicate-
+            // insensitive.
+            let child_duplicate_insensitive =
+                !group_expr.is_empty() && aggr_expr.is_empty();
+
+            rewrite_single_input(
+                input,
+                child_live,
+                child_duplicate_insensitive,
+                |input| {
+                    Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
+                        input, group_expr, aggr_expr, schema,
+                    )?))
+                },
+            )
+        }
+        LogicalPlan::Distinct(Distinct::All(input)) => {
+            // `SELECT DISTINCT *` is equivalent to a no-aggregate `GROUP BY`
+            // over every input column, so the input is duplicate-insensitive,
+            // but every column is part of the dedup key.
+            let child_live = all_columns(input.schema());
+            rewrite_single_input(input, child_live, true, |input| {
+                Ok(LogicalPlan::Distinct(Distinct::All(input)))
+            })
+        }
+        LogicalPlan::Distinct(Distinct::On(DistinctOn {
+            on_expr,
+            select_expr,
+            sort_expr,
+            input,
+            schema,
+        })) => {
+            // `DISTINCT ON (on) select [ORDER BY sort]` is a no-aggregate
+            // `GROUP BY` on the columns it reads, so its input is duplicate-
+            // insensitive; the live columns are exactly those of the
+            // ON/SELECT/ORDER BY expressions.
+            let mut child_live =
+                live_columns(on_expr.iter().chain(&select_expr), 
input.schema())?;
+            if let Some(sort_expr) = &sort_expr {
+                extend_live_columns(
+                    &mut child_live,
+                    sort_expr.iter().map(|s| &s.expr),
+                    input.schema(),
+                )?;
+            }
+
+            rewrite_single_input(input, child_live, true, |input| {
+                Ok(LogicalPlan::Distinct(Distinct::On(DistinctOn {
+                    on_expr,
+                    select_expr,
+                    sort_expr,
+                    input,
+                    schema,
+                })))
+            })
+        }
+        LogicalPlan::Sort(Sort { expr, input, fetch }) => {
+            // Adds the sort-key columns to `live`.
+            let mut child_live = live;
+            extend_live_columns(
+                &mut child_live,
+                expr.iter().map(|s| &s.expr),
+                input.schema(),
+            )?;
+
+            // A `fetch` (top-N) makes the row count observable, so duplicate-
+            // insensitivity does not survive past it.
+            let child_duplicate_insensitive = duplicate_insensitive && 
fetch.is_none();
+            rewrite_single_input(
+                input,
+                child_live,
+                child_duplicate_insensitive,
+                |input| Ok(LogicalPlan::Sort(Sort { expr, input, fetch })),
+            )
+        }
+        LogicalPlan::Limit(Limit { skip, fetch, input }) => {
+            // LIMIT makes the row count observable, so it clears 
duplicate-insensitivity.
+            rewrite_single_input(input, live, false, |input| {
+                Ok(LogicalPlan::Limit(Limit { skip, fetch, input }))
+            })
+        }
+        LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
+            // Re-aliases columns 1:1, so `live` and duplicate-sensitivity 
pass through unchanged.
+            rewrite_single_input(input, live, duplicate_insensitive, |input| {
+                Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
+                    input, alias,
+                )?))
+            })
+        }
+        LogicalPlan::Repartition(Repartition {
+            input,
+            partitioning_scheme,
+        }) => {
+            // Adds any partitioning-key columns to `live`; 
duplicate-sensitivity is unchanged.
+            let mut child_live = live;
+            match &partitioning_scheme {
+                Partitioning::Hash(exprs, _) | 
Partitioning::DistributeBy(exprs) => {
+                    extend_live_columns(&mut child_live, exprs, 
input.schema())?;
                 }
+                Partitioning::RoundRobinBatch(_) => {}
             }
-            _ => Ok(Transformed::no(plan)),
+            rewrite_single_input(input, child_live, duplicate_insensitive, 
|input| {
+                Ok(LogicalPlan::Repartition(Repartition {
+                    input,
+                    partitioning_scheme,
+                }))
+            })
         }
+        // Conservatively treat any other plan node as a fresh root, since we 
are
+        // not sure of its semantics with respect to duplicates or live 
columns.
+        _ => plan.map_children(|child| {
+            let live = all_columns(child.schema());
+            rewrite_subtree(child, live, false)
+        }),
     }
+}
+
+/// Recurses into a single-input node's child, threading `child_live` and
+/// `duplicate_insensitive` down, then rebuilds the node from the (possibly
+/// rewritten) child via `rebuild`. The child's `Transformed` flag is 
preserved,
+/// so the node is reported as changed exactly when its child changed.
+fn rewrite_single_input<F>(
+    input: Arc<LogicalPlan>,
+    child_live: LiveColumns,
+    duplicate_insensitive: bool,
+    rebuild: F,
+) -> Result<Transformed<LogicalPlan>>
+where
+    F: FnOnce(Arc<LogicalPlan>) -> Result<LogicalPlan>,
+{
+    rewrite_subtree(
+        Arc::unwrap_or_clone(input),
+        child_live,
+        duplicate_insensitive,
+    )?
+    .map_data(|input| rebuild(Arc::new(input)))
+}
+
+fn rewrite_join(
+    join: Join,
+    live: &LiveColumns,
+    duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+    if join.join_type == JoinType::Inner
+        && join.on.is_empty()
+        && matches!(
+            join.filter.as_ref(),
+            Some(Expr::Literal(ScalarValue::Boolean(Some(false)), _))
+        )
+    {
+        return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
+            EmptyRelation {
+                produce_one_row: false,
+                schema: join.schema,
+            },
+        )));
+    }
+
+    let (visible_left, visible_right) = split_join_output_columns(&join, live);
+
+    let rewritten_join_type =
+        rewritten_join_type(&join, &visible_left, &visible_right, 
duplicate_insensitive);
+
+    let (mut left_live, mut right_live) = match rewritten_join_type {
+        JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
+            (visible_left, LiveColumns::new())
+        }
+        JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
+            (LiveColumns::new(), visible_right)
+        }
+        _ => (visible_left, visible_right),
+    };
+
+    add_join_condition_columns(&join, &mut left_live, &mut right_live)?;
+
+    let (left_dup_insensitive, right_dup_insensitive) =
+        child_duplicate_insensitivity(rewritten_join_type, 
duplicate_insensitive);
 
-    fn supports_rewrite(&self) -> bool {
-        true
+    let left = rewrite_subtree(
+        Arc::unwrap_or_clone(join.left),
+        left_live,
+        left_dup_insensitive,
+    )?;
+    let right = rewrite_subtree(
+        Arc::unwrap_or_clone(join.right),
+        right_live,
+        right_dup_insensitive,
+    )?;
+
+    let changed =
+        left.transformed || right.transformed || rewritten_join_type != 
join.join_type;
+    let left = Arc::new(left.data);
+    let right = Arc::new(right.data);
+
+    if changed {
+        // The join type or an input changed, so the output schema may have
+        // narrowed; recompute it via `try_new`.
+        Ok(Transformed::yes(LogicalPlan::Join(Join::try_new(
+            left,
+            right,
+            join.on,
+            join.filter,
+            rewritten_join_type,
+            join.join_constraint,
+            join.null_equality,
+            join.null_aware,
+        )?)))
+    } else {
+        // Nothing changed; reassemble the join reusing its existing schema 
rather
+        // than recomputing it.
+        Ok(Transformed::no(LogicalPlan::Join(Join {
+            left,
+            right,
+            on: join.on,
+            filter: join.filter,
+            join_type: join.join_type,
+            join_constraint: join.join_constraint,
+            schema: join.schema,
+            null_equality: join.null_equality,
+            null_aware: join.null_aware,
+        })))
+    }
+}
+
+/// Returns which join inputs can safely ignore duplicate rows from their own
+/// descendants. For semi/anti/mark joins, duplicates from the existence side 
do
+/// not change the result even when the parent itself is duplicate-sensitive.
+fn child_duplicate_insensitivity(
+    join_type: JoinType,
+    duplicate_insensitive: bool,
+) -> (bool, bool) {
+    match join_type {
+        JoinType::Inner => (duplicate_insensitive, duplicate_insensitive),
+        JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
+            (duplicate_insensitive, true)
+        }
+        JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
+            (true, duplicate_insensitive)
+        }
+        JoinType::Left | JoinType::Right | JoinType::Full => (false, false),
     }
 }
 
+/// Rewrites an inner join to a semi join when the removed side has no
+/// parent-visible columns and either the parent ignores duplicate output rows 
or
+/// the removed side is unique on the join keys.
+fn rewritten_join_type(
+    join: &Join,
+    visible_left: &LiveColumns,
+    visible_right: &LiveColumns,
+    duplicate_insensitive: bool,
+) -> JoinType {
+    if join.join_type != JoinType::Inner || join.on.is_empty() {
+        return join.join_type;
+    }
+
+    let can_remove_right = duplicate_insensitive
+        || side_unique_on_join(
+            join.right.schema(),
+            join.on.iter().map(|(_, right)| right),
+            join.null_equality,
+        );
+    if visible_right.is_empty() && can_remove_right {
+        return JoinType::LeftSemi;
+    }
+
+    let can_remove_left = duplicate_insensitive
+        || side_unique_on_join(
+            join.left.schema(),
+            join.on.iter().map(|(left, _)| left),
+            join.null_equality,
+        );
+    if visible_left.is_empty() && can_remove_left {
+        return JoinType::RightSemi;
+    }
+
+    JoinType::Inner
+}
+
+fn add_join_condition_columns(
+    join: &Join,
+    left_live: &mut LiveColumns,
+    right_live: &mut LiveColumns,
+) -> Result<()> {
+    extend_live_columns(
+        left_live,
+        join.on.iter().map(|(l, _)| l),
+        join.left.schema(),
+    )?;
+    extend_live_columns(
+        right_live,
+        join.on.iter().map(|(_, r)| r),
+        join.right.schema(),
+    )?;
+
+    if let Some(filter) = &join.filter {
+        extend_live_columns(left_live, [filter], join.left.schema())?;
+        extend_live_columns(right_live, [filter], join.right.schema())?;
+    }
+
+    Ok(())
+}
+
+fn split_join_output_columns(
+    join: &Join,
+    live: &LiveColumns,
+) -> (LiveColumns, LiveColumns) {
+    let left_len = join.left.schema().fields().len();
+    match join.join_type {
+        JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
{
+            split_columns_at(live, left_len)
+        }
+        // A semi/anti/mark join outputs only the surviving side's columns, 
with
+        // the same index space, so `live` passes straight through to that 
side.
+        JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
+            (live.clone(), LiveColumns::new())
+        }
+        JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
+            (LiveColumns::new(), live.clone())
+        }
+    }
+}
+
+fn split_columns_at(live: &LiveColumns, left_len: usize) -> (LiveColumns, 
LiveColumns) {
+    let mut left = LiveColumns::new();
+    let mut right = LiveColumns::new();
+    for idx in live {
+        if *idx < left_len {
+            left.insert(*idx);
+        } else {
+            right.insert(*idx - left_len);
+        }
+    }
+    (left, right)
+}
+
+fn side_unique_on_join<'a>(

Review Comment:
   I think it would be a bit awkward as a method on `Join`, and I couldn't find 
other potential callers in the tree right now. But there is probably some 
opportunity for refactoring some of the logic here, possibly as part of the 
FunctionalDependencies or DFSchema code. Would prefer to defer to a separate PR 
though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to