This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 74c7e52377 Refactor EliminateOuterJoin to use `rewrite()` (#10081)
74c7e52377 is described below

commit 74c7e52377e8fd513356514949f3ce377cbc3e1f
Author: Peter Toth <[email protected]>
AuthorDate: Mon Apr 15 11:52:32 2024 +0200

    Refactor EliminateOuterJoin to use `rewrite()` (#10081)
---
 datafusion/optimizer/src/eliminate_outer_join.rs   | 71 +++++++++++++---------
 .../src/simplify_expressions/simplify_exprs.rs     |  2 +-
 2 files changed, 42 insertions(+), 31 deletions(-)

diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs 
b/datafusion/optimizer/src/eliminate_outer_join.rs
index 63b8b887bb..c3c5d80922 100644
--- a/datafusion/optimizer/src/eliminate_outer_join.rs
+++ b/datafusion/optimizer/src/eliminate_outer_join.rs
@@ -17,11 +17,12 @@
 
 //! [`EliminateOuterJoin`] converts `LEFT/RIGHT/FULL` joins to `INNER` joins
 use crate::{OptimizerConfig, OptimizerRule};
-use datafusion_common::{Column, DFSchema, Result};
+use datafusion_common::{internal_err, Column, DFSchema, Result};
 use datafusion_expr::logical_plan::{Join, JoinType, LogicalPlan};
-use datafusion_expr::{Expr, Operator};
+use datafusion_expr::{Expr, Filter, Operator};
 
 use crate::optimizer::ApplyOrder;
+use datafusion_common::tree_node::Transformed;
 use datafusion_expr::expr::{BinaryExpr, Cast, TryCast};
 use std::sync::Arc;
 
@@ -61,9 +62,29 @@ impl EliminateOuterJoin {
 impl OptimizerRule for EliminateOuterJoin {
     fn try_optimize(
         &self,
-        plan: &LogicalPlan,
+        _plan: &LogicalPlan,
         _config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
+        internal_err!("Should have called EliminateOuterJoin::rewrite")
+    }
+
+    fn name(&self) -> &str {
+        "eliminate_outer_join"
+    }
+
+    fn apply_order(&self) -> Option<ApplyOrder> {
+        Some(ApplyOrder::TopDown)
+    }
+
+    fn supports_rewrite(&self) -> bool {
+        true
+    }
+
+    fn rewrite(
+        &self,
+        plan: LogicalPlan,
+        _config: &dyn OptimizerConfig,
+    ) -> Result<Transformed<LogicalPlan>> {
         match plan {
             LogicalPlan::Filter(filter) => match filter.input.as_ref() {
                 LogicalPlan::Join(join) => {
@@ -75,7 +96,7 @@ impl OptimizerRule for EliminateOuterJoin {
                         join.left.schema(),
                         join.right.schema(),
                         true,
-                    )?;
+                    );
 
                     let new_join_type = if join.join_type.is_outer() {
                         let mut left_non_nullable = false;
@@ -96,7 +117,7 @@ impl OptimizerRule for EliminateOuterJoin {
                     } else {
                         join.join_type
                     };
-                    let new_join = LogicalPlan::Join(Join {
+                    let new_join = Arc::new(LogicalPlan::Join(Join {
                         left: Arc::new((*join.left).clone()),
                         right: Arc::new((*join.right).clone()),
                         join_type: new_join_type,
@@ -105,23 +126,15 @@ impl OptimizerRule for EliminateOuterJoin {
                         filter: join.filter.clone(),
                         schema: join.schema.clone(),
                         null_equals_null: join.null_equals_null,
-                    });
-                    let exprs = plan.expressions();
-                    plan.with_new_exprs(exprs, vec![new_join]).map(Some)
+                    }));
+                    Filter::try_new(filter.predicate, new_join)
+                        .map(|f| Transformed::yes(LogicalPlan::Filter(f)))
                 }
-                _ => Ok(None),
+                _ => Ok(Transformed::no(LogicalPlan::Filter(filter))),
             },
-            _ => Ok(None),
+            _ => Ok(Transformed::no(plan)),
         }
     }
-
-    fn name(&self) -> &str {
-        "eliminate_outer_join"
-    }
-
-    fn apply_order(&self) -> Option<ApplyOrder> {
-        Some(ApplyOrder::TopDown)
-    }
 }
 
 pub fn eliminate_outer(
@@ -169,11 +182,10 @@ fn extract_non_nullable_columns(
     left_schema: &Arc<DFSchema>,
     right_schema: &Arc<DFSchema>,
     top_level: bool,
-) -> Result<()> {
+) {
     match expr {
         Expr::Column(col) => {
             non_nullable_cols.push(col.clone());
-            Ok(())
         }
         Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
             // If one of the inputs are null for these operators, the results 
should be false.
@@ -189,7 +201,7 @@ fn extract_non_nullable_columns(
                     left_schema,
                     right_schema,
                     false,
-                )?;
+                );
                 extract_non_nullable_columns(
                     right,
                     non_nullable_cols,
@@ -208,15 +220,15 @@ fn extract_non_nullable_columns(
                         left_schema,
                         right_schema,
                         top_level,
-                    )?;
+                    );
                     extract_non_nullable_columns(
                         right,
                         non_nullable_cols,
                         left_schema,
                         right_schema,
                         top_level,
-                    )?;
-                    return Ok(());
+                    );
+                    return;
                 }
 
                 let mut left_non_nullable_cols: Vec<Column> = vec![];
@@ -228,14 +240,14 @@ fn extract_non_nullable_columns(
                     left_schema,
                     right_schema,
                     top_level,
-                )?;
+                );
                 extract_non_nullable_columns(
                     right,
                     &mut right_non_nullable_cols,
                     left_schema,
                     right_schema,
                     top_level,
-                )?;
+                );
 
                 // for query: select *** from a left join b where b.c1 ... or 
b.c2 ...
                 // this can be eliminated to inner join.
@@ -259,9 +271,8 @@ fn extract_non_nullable_columns(
                         }
                     }
                 }
-                Ok(())
             }
-            _ => Ok(()),
+            _ => {}
         },
         Expr::Not(arg) => extract_non_nullable_columns(
             arg,
@@ -272,7 +283,7 @@ fn extract_non_nullable_columns(
         ),
         Expr::IsNotNull(arg) => {
             if !top_level {
-                return Ok(());
+                return;
             }
             extract_non_nullable_columns(
                 arg,
@@ -290,7 +301,7 @@ fn extract_non_nullable_columns(
             right_schema,
             false,
         ),
-        _ => Ok(()),
+        _ => {}
     }
 }
 
diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs 
b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
index 17312fa654..5b0314af20 100644
--- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
+++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs
@@ -53,7 +53,7 @@ impl OptimizerRule for SimplifyExpressions {
         _plan: &LogicalPlan,
         _config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
-        internal_err!("Should have called 
SimplifyExpressions::try_optimize_owned")
+        internal_err!("Should have called SimplifyExpressions::rewrite")
     }
 
     fn name(&self) -> &str {

Reply via email to