neilconway commented on code in PR #22652:
URL: https://github.com/apache/datafusion/pull/22652#discussion_r3367332265
##########
datafusion/optimizer/src/eliminate_join.rs:
##########
@@ -42,44 +92,485 @@ impl OptimizerRule for EliminateJoin {
"eliminate_join"
}
- fn apply_order(&self) -> Option<ApplyOrder> {
- Some(ApplyOrder::TopDown)
- }
-
fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
- match plan {
- LogicalPlan::Join(join) if join.join_type == Inner &&
join.on.is_empty() => {
- match join.filter {
- Some(Expr::Literal(ScalarValue::Boolean(Some(false)), _))
=> Ok(
-
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
- produce_one_row: false,
- schema: join.schema,
- })),
- ),
- _ => Ok(Transformed::no(LogicalPlan::Join(join))),
+ let live = all_columns(plan.schema());
+ rewrite_subtree(plan, live, false)
+ }
+}
+
+/// Rewrites `plan` and everything below it, including joins nested inside
+/// subquery expressions.
+///
+/// [`rewrite_node`] handles the node itself and recurses into its plan
+/// children; this wrapper additionally descends into the node's own subquery
+/// expressions. Each subquery is seeded as a fresh root, since its columns
are
+/// independent of the enclosing plan's `live` set.
+fn rewrite_subtree(
+ plan: LogicalPlan,
+ live: LiveColumns,
+ duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+ rewrite_node(plan, live, duplicate_insensitive)?.transform_data(|plan| {
+ plan.map_subqueries(|subquery| {
+ let live = all_columns(subquery.schema());
+ rewrite_subtree(subquery, live, false)
+ })
+ })
+}
+
+fn rewrite_node(
+ plan: LogicalPlan,
+ live: LiveColumns,
+ duplicate_insensitive: bool,
+) -> Result<Transformed<LogicalPlan>> {
+ match plan {
+ // The only arm that rewrites a join; the rest just thread context
down to one.
+ LogicalPlan::Join(join) => rewrite_join(join, &live,
duplicate_insensitive),
+ LogicalPlan::Projection(Projection {
+ expr,
+ input,
+ schema,
+ ..
+ }) => {
+ // Narrows `live` to the columns the projection's expressions
reference.
+ let child_live = live_columns(&expr, input.schema())?;
+ rewrite_single_input(input, child_live, duplicate_insensitive,
|input| {
+ Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
+ expr, input, schema,
+ )?))
+ })
+ }
+ LogicalPlan::Filter(Filter {
+ predicate, input, ..
+ }) => {
+ // Adds the predicate's columns to `live` (a side used only by the
filter stays live).
+ let mut child_live = live;
+ extend_live_columns(&mut child_live, [&predicate],
input.schema())?;
+ rewrite_single_input(input, child_live, duplicate_insensitive,
|input| {
+ Ok(LogicalPlan::Filter(Filter::new(predicate, input)))
+ })
+ }
+ LogicalPlan::Aggregate(Aggregate {
+ input,
+ group_expr,
+ aggr_expr,
+ schema,
+ ..
+ }) => {
+ // Narrows `live` to the grouping and aggregate expressions'
columns.
+ let child_live =
+ live_columns(group_expr.iter().chain(&aggr_expr),
input.schema())?;
+
+ // A grouping aggregate with no aggregate functions (`GROUP BY`
with
+ // an empty `aggr_expr`) only observes which group-key values
exist,
+ // not how many rows produced them, so its input is duplicate-
+ // insensitive.
+ let child_duplicate_insensitive =
+ !group_expr.is_empty() && aggr_expr.is_empty();
+
+ rewrite_single_input(
+ input,
+ child_live,
+ child_duplicate_insensitive,
+ |input| {
+ Ok(LogicalPlan::Aggregate(Aggregate::try_new_with_schema(
+ input, group_expr, aggr_expr, schema,
+ )?))
+ },
+ )
+ }
+ LogicalPlan::Distinct(Distinct::All(input)) => {
Review Comment:
Thanks! I added some more tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]