This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3698693fab Filter pushdown into cross join (#8626)
3698693fab is described below

commit 3698693fab040dfb077edaf763b6935e9f42ea06
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Dec 25 10:43:52 2023 +0300

    Filter pushdown into cross join (#8626)
    
    * Initial commit
    
    * Simplifications
    
    * Review
    
    * Review Part 2
    
    * More idiomatic Rust
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 datafusion/optimizer/src/eliminate_cross_join.rs | 128 +++++++++++++----------
 datafusion/optimizer/src/push_down_filter.rs     |  89 +++++++++++-----
 datafusion/sqllogictest/test_files/joins.slt     |  17 +++
 3 files changed, 152 insertions(+), 82 deletions(-)

diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs 
b/datafusion/optimizer/src/eliminate_cross_join.rs
index cf9a59d6b8..7c866950a6 100644
--- a/datafusion/optimizer/src/eliminate_cross_join.rs
+++ b/datafusion/optimizer/src/eliminate_cross_join.rs
@@ -20,6 +20,7 @@ use std::collections::HashSet;
 use std::sync::Arc;
 
 use crate::{utils, OptimizerConfig, OptimizerRule};
+
 use datafusion_common::{plan_err, DataFusionError, Result};
 use datafusion_expr::expr::{BinaryExpr, Expr};
 use datafusion_expr::logical_plan::{
@@ -47,81 +48,93 @@ impl EliminateCrossJoin {
 /// For above queries, the join predicate is available in filters and they are 
moved to
 /// join nodes appropriately
 /// This fix helps to improve the performance of TPCH Q19. issue#78
-///
 impl OptimizerRule for EliminateCrossJoin {
     fn try_optimize(
         &self,
         plan: &LogicalPlan,
         config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
-        match plan {
+        let mut possible_join_keys: Vec<(Expr, Expr)> = vec![];
+        let mut all_inputs: Vec<LogicalPlan> = vec![];
+        let parent_predicate = match plan {
             LogicalPlan::Filter(filter) => {
-                let input = filter.input.as_ref().clone();
-
-                let mut possible_join_keys: Vec<(Expr, Expr)> = vec![];
-                let mut all_inputs: Vec<LogicalPlan> = vec![];
-                let did_flat_successfully = match &input {
+                let input = filter.input.as_ref();
+                match input {
                     LogicalPlan::Join(Join {
                         join_type: JoinType::Inner,
                         ..
                     })
-                    | LogicalPlan::CrossJoin(_) => try_flatten_join_inputs(
-                        &input,
-                        &mut possible_join_keys,
-                        &mut all_inputs,
-                    )?,
+                    | LogicalPlan::CrossJoin(_) => {
+                        if !try_flatten_join_inputs(
+                            input,
+                            &mut possible_join_keys,
+                            &mut all_inputs,
+                        )? {
+                            return Ok(None);
+                        }
+                        extract_possible_join_keys(
+                            &filter.predicate,
+                            &mut possible_join_keys,
+                        )?;
+                        Some(&filter.predicate)
+                    }
                     _ => {
                         return utils::optimize_children(self, plan, config);
                     }
-                };
-
-                if !did_flat_successfully {
+                }
+            }
+            LogicalPlan::Join(Join {
+                join_type: JoinType::Inner,
+                ..
+            }) => {
+                if !try_flatten_join_inputs(
+                    plan,
+                    &mut possible_join_keys,
+                    &mut all_inputs,
+                )? {
                     return Ok(None);
                 }
+                None
+            }
+            _ => return utils::optimize_children(self, plan, config),
+        };
 
-                let predicate = &filter.predicate;
-                // join keys are handled locally
-                let mut all_join_keys: HashSet<(Expr, Expr)> = HashSet::new();
-
-                extract_possible_join_keys(predicate, &mut 
possible_join_keys)?;
+        // Join keys are handled locally:
+        let mut all_join_keys = HashSet::<(Expr, Expr)>::new();
+        let mut left = all_inputs.remove(0);
+        while !all_inputs.is_empty() {
+            left = find_inner_join(
+                &left,
+                &mut all_inputs,
+                &mut possible_join_keys,
+                &mut all_join_keys,
+            )?;
+        }
 
-                let mut left = all_inputs.remove(0);
-                while !all_inputs.is_empty() {
-                    left = find_inner_join(
-                        &left,
-                        &mut all_inputs,
-                        &mut possible_join_keys,
-                        &mut all_join_keys,
-                    )?;
-                }
+        left = utils::optimize_children(self, &left, config)?.unwrap_or(left);
 
-                left = utils::optimize_children(self, &left, 
config)?.unwrap_or(left);
+        if plan.schema() != left.schema() {
+            left = LogicalPlan::Projection(Projection::new_from_schema(
+                Arc::new(left),
+                plan.schema().clone(),
+            ));
+        }
 
-                if plan.schema() != left.schema() {
-                    left = LogicalPlan::Projection(Projection::new_from_schema(
-                        Arc::new(left.clone()),
-                        plan.schema().clone(),
-                    ));
-                }
+        let Some(predicate) = parent_predicate else {
+            return Ok(Some(left));
+        };
 
-                // if there are no join keys then do nothing.
-                if all_join_keys.is_empty() {
-                    Ok(Some(LogicalPlan::Filter(Filter::try_new(
-                        predicate.clone(),
-                        Arc::new(left),
-                    )?)))
-                } else {
-                    // remove join expressions from filter
-                    match remove_join_expressions(predicate, &all_join_keys)? {
-                        Some(filter_expr) => Ok(Some(LogicalPlan::Filter(
-                            Filter::try_new(filter_expr, Arc::new(left))?,
-                        ))),
-                        _ => Ok(Some(left)),
-                    }
-                }
+        // If there are no join keys then do nothing:
+        if all_join_keys.is_empty() {
+            Filter::try_new(predicate.clone(), Arc::new(left))
+                .map(|f| Some(LogicalPlan::Filter(f)))
+        } else {
+            // Remove join expressions from filter:
+            match remove_join_expressions(predicate, &all_join_keys)? {
+                Some(filter_expr) => Filter::try_new(filter_expr, 
Arc::new(left))
+                    .map(|f| Some(LogicalPlan::Filter(f))),
+                _ => Ok(Some(left)),
             }
-
-            _ => utils::optimize_children(self, plan, config),
         }
     }
 
@@ -325,17 +338,16 @@ fn remove_join_expressions(
 
 #[cfg(test)]
 mod tests {
+    use super::*;
+    use crate::optimizer::OptimizerContext;
+    use crate::test::*;
+
     use datafusion_expr::{
         binary_expr, col, lit,
         logical_plan::builder::LogicalPlanBuilder,
         Operator::{And, Or},
     };
 
-    use crate::optimizer::OptimizerContext;
-    use crate::test::*;
-
-    use super::*;
-
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: Vec<&str>) {
         let rule = EliminateCrossJoin::new();
         let optimized_plan = rule
diff --git a/datafusion/optimizer/src/push_down_filter.rs 
b/datafusion/optimizer/src/push_down_filter.rs
index 4bea17500a..4eed39a089 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -15,25 +15,29 @@
 //! [`PushDownFilter`] Moves filters so they are applied as early as possible 
in
 //! the plan.
 
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
 use crate::optimizer::ApplyOrder;
 use crate::{OptimizerConfig, OptimizerRule};
+
 use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
 use datafusion_common::{
-    internal_err, plan_datafusion_err, Column, DFSchema, DataFusionError, 
Result,
+    internal_err, plan_datafusion_err, Column, DFSchema, DFSchemaRef, 
DataFusionError,
+    JoinConstraint, Result,
 };
 use datafusion_expr::expr::Alias;
+use datafusion_expr::expr_rewriter::replace_col;
+use datafusion_expr::logical_plan::{
+    CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union,
+};
 use datafusion_expr::utils::{conjunction, split_conjunction, 
split_conjunction_owned};
-use datafusion_expr::Volatility;
 use datafusion_expr::{
-    and,
-    expr_rewriter::replace_col,
-    logical_plan::{CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union},
-    or, BinaryExpr, Expr, Filter, Operator, ScalarFunctionDefinition,
-    TableProviderFilterPushDown,
+    and, build_join_schema, or, BinaryExpr, Expr, Filter, LogicalPlanBuilder, 
Operator,
+    ScalarFunctionDefinition, TableProviderFilterPushDown, Volatility,
 };
+
 use itertools::Itertools;
-use std::collections::{HashMap, HashSet};
-use std::sync::Arc;
 
 /// Optimizer rule for pushing (moving) filter expressions down in a plan so
 /// they are applied as early as possible.
@@ -848,17 +852,23 @@ impl OptimizerRule for PushDownFilter {
                     None => return Ok(None),
                 }
             }
-            LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
+            LogicalPlan::CrossJoin(cross_join) => {
                 let predicates = 
split_conjunction_owned(filter.predicate.clone());
-                push_down_all_join(
+                let join = 
convert_cross_join_to_inner_join(cross_join.clone())?;
+                let join_plan = LogicalPlan::Join(join);
+                let inputs = join_plan.inputs();
+                let left = inputs[0];
+                let right = inputs[1];
+                let plan = push_down_all_join(
                     predicates,
                     vec![],
-                    &filter.input,
+                    &join_plan,
                     left,
                     right,
                     vec![],
-                    false,
-                )?
+                    true,
+                )?;
+                convert_to_cross_join_if_beneficial(plan)?
             }
             LogicalPlan::TableScan(scan) => {
                 let filter_predicates = split_conjunction(&filter.predicate);
@@ -955,6 +965,36 @@ impl PushDownFilter {
     }
 }
 
+/// Convert cross join to join by pushing down filter predicate to the join 
condition
+fn convert_cross_join_to_inner_join(cross_join: CrossJoin) -> Result<Join> {
+    let CrossJoin { left, right, .. } = cross_join;
+    let join_schema = build_join_schema(left.schema(), right.schema(), 
&JoinType::Inner)?;
+    // predicate is given
+    Ok(Join {
+        left,
+        right,
+        join_type: JoinType::Inner,
+        join_constraint: JoinConstraint::On,
+        on: vec![],
+        filter: None,
+        schema: DFSchemaRef::new(join_schema),
+        null_equals_null: true,
+    })
+}
+
+/// Converts the inner join with empty equality predicate and empty filter 
condition to the cross join
+fn convert_to_cross_join_if_beneficial(plan: LogicalPlan) -> 
Result<LogicalPlan> {
+    if let LogicalPlan::Join(join) = &plan {
+        // Can be converted back to cross join
+        if join.on.is_empty() && join.filter.is_none() {
+            return LogicalPlanBuilder::from(join.left.as_ref().clone())
+                .cross_join(join.right.as_ref().clone())?
+                .build();
+        }
+    }
+    Ok(plan)
+}
+
 /// replaces columns by its name on the projection.
 pub fn replace_cols_by_name(
     e: Expr,
@@ -1026,13 +1066,16 @@ fn contain(e: &Expr, check_map: &HashMap<String, Expr>) 
-> bool {
 
 #[cfg(test)]
 mod tests {
+    use std::fmt::{Debug, Formatter};
+    use std::sync::Arc;
+
     use super::*;
     use crate::optimizer::Optimizer;
     use crate::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate;
     use crate::test::*;
     use crate::OptimizerContext;
+
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-    use async_trait::async_trait;
     use datafusion_common::{DFSchema, DFSchemaRef};
     use datafusion_expr::logical_plan::table_scan;
     use datafusion_expr::{
@@ -1040,8 +1083,8 @@ mod tests {
         BinaryExpr, Expr, Extension, LogicalPlanBuilder, Operator, TableSource,
         TableType, UserDefinedLogicalNodeCore,
     };
-    use std::fmt::{Debug, Formatter};
-    use std::sync::Arc;
+
+    use async_trait::async_trait;
 
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> 
Result<()> {
         crate::test::assert_optimized_plan_eq(
@@ -2665,14 +2708,12 @@ Projection: a, b
             .cross_join(right)?
             .filter(filter)?
             .build()?;
-
         let expected = "\
-        Filter: test.a = d AND test.b > UInt32(1) OR test.b = e AND test.c < 
UInt32(10)\
-        \n  CrossJoin:\
-        \n    Projection: test.a, test.b, test.c\
-        \n      TableScan: test, full_filters=[test.b > UInt32(1) OR test.c < 
UInt32(10)]\
-        \n    Projection: test1.a AS d, test1.a AS e\
-        \n      TableScan: test1";
+        Inner Join:  Filter: test.a = d AND test.b > UInt32(1) OR test.b = e 
AND test.c < UInt32(10)\
+        \n  Projection: test.a, test.b, test.c\
+        \n    TableScan: test, full_filters=[test.b > UInt32(1) OR test.c < 
UInt32(10)]\
+        \n  Projection: test1.a AS d, test1.a AS e\
+        \n    TableScan: test1";
         assert_optimized_plan_eq_with_rewrite_predicate(&plan, expected)?;
 
         // Originally global state which can help to avoid duplicate Filters 
been generated and pushed down.
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index 1ad17fbb8c..eee213811f 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -3466,6 +3466,23 @@ SortPreservingMergeExec: [a@0 ASC]
 ----------------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 ------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], 
output_ordering=[a@0 ASC, b@1 ASC NULLS LAST], has_header=true
 
+query TT
+EXPLAIN SELECT *
+FROM annotated_data as l, annotated_data as r
+WHERE l.a > r.a
+----
+logical_plan
+Inner Join:  Filter: l.a > r.a
+--SubqueryAlias: l
+----TableScan: annotated_data projection=[a0, a, b, c, d]
+--SubqueryAlias: r
+----TableScan: annotated_data projection=[a0, a, b, c, d]
+physical_plan
+NestedLoopJoinExec: join_type=Inner, filter=a@0 > a@1
+--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
+--CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
+
 ####
 # Config teardown
 ####

Reply via email to