This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7219744567 Minor: Simplify + document `EliminateCrossJoin` better 
(#10427)
7219744567 is described below

commit 721974456742bf44bdae291b3114bc23fe478bcd
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu May 9 12:57:14 2024 -0400

    Minor: Simplify + document `EliminateCrossJoin` better (#10427)
---
 datafusion/expr/src/logical_plan/builder.rs        |  4 +-
 datafusion/expr/src/utils.rs                       | 10 ++---
 datafusion/optimizer/src/eliminate_cross_join.rs   | 43 +++++++++++++++-------
 .../optimizer/src/extract_equijoin_predicate.rs    | 14 +++----
 4 files changed, 41 insertions(+), 30 deletions(-)

diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 7b1e449801..3f15b84784 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -1085,8 +1085,8 @@ impl LogicalPlanBuilder {
                 find_valid_equijoin_key_pair(
                         &normalized_left_key,
                         &normalized_right_key,
-                        self.plan.schema().clone(),
-                        right.schema().clone(),
+                        self.plan.schema(),
+                        right.schema(),
                     )?.ok_or_else(||
                         plan_datafusion_err!(
                             "can't create join plan, join key should belong to 
one input, error key: ({normalized_left_key},{normalized_right_key})"
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 4282952a1e..0c1084674d 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -885,7 +885,7 @@ pub fn can_hash(data_type: &DataType) -> bool {
 /// Check whether all columns are from the schema.
 pub fn check_all_columns_from_schema(
     columns: &HashSet<Column>,
-    schema: DFSchemaRef,
+    schema: &DFSchema,
 ) -> Result<bool> {
     for col in columns.iter() {
         let exist = schema.is_column_from_schema(col);
@@ -909,8 +909,8 @@ pub fn check_all_columns_from_schema(
 pub fn find_valid_equijoin_key_pair(
     left_key: &Expr,
     right_key: &Expr,
-    left_schema: DFSchemaRef,
-    right_schema: DFSchemaRef,
+    left_schema: &DFSchema,
+    right_schema: &DFSchema,
 ) -> Result<Option<(Expr, Expr)>> {
     let left_using_columns = left_key.to_columns()?;
     let right_using_columns = right_key.to_columns()?;
@@ -920,8 +920,8 @@ pub fn find_valid_equijoin_key_pair(
         return Ok(None);
     }
 
-    if check_all_columns_from_schema(&left_using_columns, left_schema.clone())?
-        && check_all_columns_from_schema(&right_using_columns, 
right_schema.clone())?
+    if check_all_columns_from_schema(&left_using_columns, left_schema)?
+        && check_all_columns_from_schema(&right_using_columns, right_schema)?
     {
         return Ok(Some((left_key.clone(), right_key.clone())));
     } else if check_all_columns_from_schema(&right_using_columns, left_schema)?
diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs 
b/datafusion/optimizer/src/eliminate_cross_join.rs
index ae6c1b339d..a807ee5ff2 100644
--- a/datafusion/optimizer/src/eliminate_cross_join.rs
+++ b/datafusion/optimizer/src/eliminate_cross_join.rs
@@ -107,7 +107,7 @@ impl OptimizerRule for EliminateCrossJoin {
             left = find_inner_join(
                 &left,
                 &mut all_inputs,
-                &mut possible_join_keys,
+                &possible_join_keys,
                 &mut all_join_keys,
             )?;
         }
@@ -144,7 +144,9 @@ impl OptimizerRule for EliminateCrossJoin {
     }
 }
 
-/// Recursively accumulate possible_join_keys and inputs from inner joins 
(including cross joins).
+/// Recursively accumulate possible_join_keys and inputs from inner joins
+/// (including cross joins).
+///
 /// Returns a boolean indicating whether the flattening was successful.
 fn try_flatten_join_inputs(
     plan: &LogicalPlan,
@@ -159,14 +161,10 @@ fn try_flatten_join_inputs(
                 return Ok(false);
             }
             possible_join_keys.extend(join.on.clone());
-            let left = &*(join.left);
-            let right = &*(join.right);
-            vec![left, right]
+            vec![&join.left, &join.right]
         }
         LogicalPlan::CrossJoin(join) => {
-            let left = &*(join.left);
-            let right = &*(join.right);
-            vec![left, right]
+            vec![&join.left, &join.right]
         }
         _ => {
             return plan_err!("flatten_join_inputs just can call 
join/cross_join");
@@ -174,7 +172,8 @@ fn try_flatten_join_inputs(
     };
 
     for child in children.iter() {
-        match *child {
+        let child = child.as_ref();
+        match child {
             LogicalPlan::Join(Join {
                 join_type: JoinType::Inner,
                 ..
@@ -184,27 +183,39 @@ fn try_flatten_join_inputs(
                     return Ok(false);
                 }
             }
-            _ => all_inputs.push((*child).clone()),
+            _ => all_inputs.push(child.clone()),
         }
     }
     Ok(true)
 }
 
+/// Finds the next to join with the left input plan,
+///
+/// Finds the next `right` from `rights` that can be joined with `left_input`
+/// plan based on the join keys in `possible_join_keys`.
+///
+/// If such a matching `right` is found:
+/// 1. Adds the matching join keys to `all_join_keys`.
+/// 2. Returns `left_input JOIN right ON (all join keys)`.
+///
+/// If no matching `right` is found:
+/// 1. Removes the first plan from `rights`
+/// 2. Returns `left_input CROSS JOIN right`.
 fn find_inner_join(
     left_input: &LogicalPlan,
     rights: &mut Vec<LogicalPlan>,
-    possible_join_keys: &mut Vec<(Expr, Expr)>,
+    possible_join_keys: &[(Expr, Expr)],
     all_join_keys: &mut HashSet<(Expr, Expr)>,
 ) -> Result<LogicalPlan> {
     for (i, right_input) in rights.iter().enumerate() {
         let mut join_keys = vec![];
 
-        for (l, r) in &mut *possible_join_keys {
+        for (l, r) in possible_join_keys.iter() {
             let key_pair = find_valid_equijoin_key_pair(
                 l,
                 r,
-                left_input.schema().clone(),
-                right_input.schema().clone(),
+                left_input.schema(),
+                right_input.schema(),
             )?;
 
             // Save join keys
@@ -215,6 +226,7 @@ fn find_inner_join(
             }
         }
 
+        // Found one or more matching join keys
         if !join_keys.is_empty() {
             all_join_keys.extend(join_keys.clone());
             let right_input = rights.remove(i);
@@ -236,6 +248,9 @@ fn find_inner_join(
             }));
         }
     }
+
+    // no matching right plan had any join keys, cross join with the first 
right
+    // plan
     let right = rights.remove(0);
     let join_schema = Arc::new(build_join_schema(
         left_input.schema(),
diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs 
b/datafusion/optimizer/src/extract_equijoin_predicate.rs
index c47a86974c..237c003524 100644
--- a/datafusion/optimizer/src/extract_equijoin_predicate.rs
+++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs
@@ -24,7 +24,6 @@ use datafusion_common::{internal_err, DFSchema};
 use datafusion_expr::utils::split_conjunction_owned;
 use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair};
 use datafusion_expr::{BinaryExpr, Expr, ExprSchemable, Join, LogicalPlan, 
Operator};
-use std::sync::Arc;
 // equijoin predicate
 type EquijoinPredicate = (Expr, Expr);
 
@@ -122,8 +121,8 @@ impl OptimizerRule for ExtractEquijoinPredicate {
 
 fn split_eq_and_noneq_join_predicate(
     filter: Expr,
-    left_schema: &Arc<DFSchema>,
-    right_schema: &Arc<DFSchema>,
+    left_schema: &DFSchema,
+    right_schema: &DFSchema,
 ) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
     let exprs = split_conjunction_owned(filter);
 
@@ -136,12 +135,8 @@ fn split_eq_and_noneq_join_predicate(
                 op: Operator::Eq,
                 ref right,
             }) => {
-                let join_key_pair = find_valid_equijoin_key_pair(
-                    left,
-                    right,
-                    left_schema.clone(),
-                    right_schema.clone(),
-                )?;
+                let join_key_pair =
+                    find_valid_equijoin_key_pair(left, right, left_schema, 
right_schema)?;
 
                 if let Some((left_expr, right_expr)) = join_key_pair {
                     let left_expr_type = left_expr.get_type(left_schema)?;
@@ -172,6 +167,7 @@ mod tests {
     use datafusion_expr::{
         col, lit, logical_plan::builder::LogicalPlanBuilder, JoinType,
     };
+    use std::sync::Arc;
 
     fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> {
         assert_optimized_plan_eq_display_indent(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to