This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3698693fab Filter pushdown into cross join (#8626)
3698693fab is described below
commit 3698693fab040dfb077edaf763b6935e9f42ea06
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Dec 25 10:43:52 2023 +0300
Filter pushdown into cross join (#8626)
* Initial commit
* Simplifications
* Review
* Review Part 2
* More idiomatic Rust
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
datafusion/optimizer/src/eliminate_cross_join.rs | 128 +++++++++++++----------
datafusion/optimizer/src/push_down_filter.rs | 89 +++++++++++-----
datafusion/sqllogictest/test_files/joins.slt | 17 +++
3 files changed, 152 insertions(+), 82 deletions(-)
diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs
b/datafusion/optimizer/src/eliminate_cross_join.rs
index cf9a59d6b8..7c866950a6 100644
--- a/datafusion/optimizer/src/eliminate_cross_join.rs
+++ b/datafusion/optimizer/src/eliminate_cross_join.rs
@@ -20,6 +20,7 @@ use std::collections::HashSet;
use std::sync::Arc;
use crate::{utils, OptimizerConfig, OptimizerRule};
+
use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_expr::expr::{BinaryExpr, Expr};
use datafusion_expr::logical_plan::{
@@ -47,81 +48,93 @@ impl EliminateCrossJoin {
/// For above queries, the join predicate is available in filters and they are
moved to
/// join nodes appropriately
/// This fix helps to improve the performance of TPCH Q19. issue#78
-///
impl OptimizerRule for EliminateCrossJoin {
fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
- match plan {
+ let mut possible_join_keys: Vec<(Expr, Expr)> = vec![];
+ let mut all_inputs: Vec<LogicalPlan> = vec![];
+ let parent_predicate = match plan {
LogicalPlan::Filter(filter) => {
- let input = filter.input.as_ref().clone();
-
- let mut possible_join_keys: Vec<(Expr, Expr)> = vec![];
- let mut all_inputs: Vec<LogicalPlan> = vec![];
- let did_flat_successfully = match &input {
+ let input = filter.input.as_ref();
+ match input {
LogicalPlan::Join(Join {
join_type: JoinType::Inner,
..
})
- | LogicalPlan::CrossJoin(_) => try_flatten_join_inputs(
- &input,
- &mut possible_join_keys,
- &mut all_inputs,
- )?,
+ | LogicalPlan::CrossJoin(_) => {
+ if !try_flatten_join_inputs(
+ input,
+ &mut possible_join_keys,
+ &mut all_inputs,
+ )? {
+ return Ok(None);
+ }
+ extract_possible_join_keys(
+ &filter.predicate,
+ &mut possible_join_keys,
+ )?;
+ Some(&filter.predicate)
+ }
_ => {
return utils::optimize_children(self, plan, config);
}
- };
-
- if !did_flat_successfully {
+ }
+ }
+ LogicalPlan::Join(Join {
+ join_type: JoinType::Inner,
+ ..
+ }) => {
+ if !try_flatten_join_inputs(
+ plan,
+ &mut possible_join_keys,
+ &mut all_inputs,
+ )? {
return Ok(None);
}
+ None
+ }
+ _ => return utils::optimize_children(self, plan, config),
+ };
- let predicate = &filter.predicate;
- // join keys are handled locally
- let mut all_join_keys: HashSet<(Expr, Expr)> = HashSet::new();
-
- extract_possible_join_keys(predicate, &mut
possible_join_keys)?;
+ // Join keys are handled locally:
+ let mut all_join_keys = HashSet::<(Expr, Expr)>::new();
+ let mut left = all_inputs.remove(0);
+ while !all_inputs.is_empty() {
+ left = find_inner_join(
+ &left,
+ &mut all_inputs,
+ &mut possible_join_keys,
+ &mut all_join_keys,
+ )?;
+ }
- let mut left = all_inputs.remove(0);
- while !all_inputs.is_empty() {
- left = find_inner_join(
- &left,
- &mut all_inputs,
- &mut possible_join_keys,
- &mut all_join_keys,
- )?;
- }
+ left = utils::optimize_children(self, &left, config)?.unwrap_or(left);
- left = utils::optimize_children(self, &left,
config)?.unwrap_or(left);
+ if plan.schema() != left.schema() {
+ left = LogicalPlan::Projection(Projection::new_from_schema(
+ Arc::new(left),
+ plan.schema().clone(),
+ ));
+ }
- if plan.schema() != left.schema() {
- left = LogicalPlan::Projection(Projection::new_from_schema(
- Arc::new(left.clone()),
- plan.schema().clone(),
- ));
- }
+ let Some(predicate) = parent_predicate else {
+ return Ok(Some(left));
+ };
- // if there are no join keys then do nothing.
- if all_join_keys.is_empty() {
- Ok(Some(LogicalPlan::Filter(Filter::try_new(
- predicate.clone(),
- Arc::new(left),
- )?)))
- } else {
- // remove join expressions from filter
- match remove_join_expressions(predicate, &all_join_keys)? {
- Some(filter_expr) => Ok(Some(LogicalPlan::Filter(
- Filter::try_new(filter_expr, Arc::new(left))?,
- ))),
- _ => Ok(Some(left)),
- }
- }
+ // If there are no join keys then do nothing:
+ if all_join_keys.is_empty() {
+ Filter::try_new(predicate.clone(), Arc::new(left))
+ .map(|f| Some(LogicalPlan::Filter(f)))
+ } else {
+ // Remove join expressions from filter:
+ match remove_join_expressions(predicate, &all_join_keys)? {
+ Some(filter_expr) => Filter::try_new(filter_expr,
Arc::new(left))
+ .map(|f| Some(LogicalPlan::Filter(f))),
+ _ => Ok(Some(left)),
}
-
- _ => utils::optimize_children(self, plan, config),
}
}
@@ -325,17 +338,16 @@ fn remove_join_expressions(
#[cfg(test)]
mod tests {
+ use super::*;
+ use crate::optimizer::OptimizerContext;
+ use crate::test::*;
+
use datafusion_expr::{
binary_expr, col, lit,
logical_plan::builder::LogicalPlanBuilder,
Operator::{And, Or},
};
- use crate::optimizer::OptimizerContext;
- use crate::test::*;
-
- use super::*;
-
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: Vec<&str>) {
let rule = EliminateCrossJoin::new();
let optimized_plan = rule
diff --git a/datafusion/optimizer/src/push_down_filter.rs
b/datafusion/optimizer/src/push_down_filter.rs
index 4bea17500a..4eed39a089 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -15,25 +15,29 @@
//! [`PushDownFilter`] Moves filters so they are applied as early as possible
in
//! the plan.
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
+
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{
- internal_err, plan_datafusion_err, Column, DFSchema, DataFusionError,
Result,
+ internal_err, plan_datafusion_err, Column, DFSchema, DFSchemaRef,
DataFusionError,
+ JoinConstraint, Result,
};
use datafusion_expr::expr::Alias;
+use datafusion_expr::expr_rewriter::replace_col;
+use datafusion_expr::logical_plan::{
+ CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union,
+};
use datafusion_expr::utils::{conjunction, split_conjunction,
split_conjunction_owned};
-use datafusion_expr::Volatility;
use datafusion_expr::{
- and,
- expr_rewriter::replace_col,
- logical_plan::{CrossJoin, Join, JoinType, LogicalPlan, TableScan, Union},
- or, BinaryExpr, Expr, Filter, Operator, ScalarFunctionDefinition,
- TableProviderFilterPushDown,
+ and, build_join_schema, or, BinaryExpr, Expr, Filter, LogicalPlanBuilder,
Operator,
+ ScalarFunctionDefinition, TableProviderFilterPushDown, Volatility,
};
+
use itertools::Itertools;
-use std::collections::{HashMap, HashSet};
-use std::sync::Arc;
/// Optimizer rule for pushing (moving) filter expressions down in a plan so
/// they are applied as early as possible.
@@ -848,17 +852,23 @@ impl OptimizerRule for PushDownFilter {
None => return Ok(None),
}
}
- LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
+ LogicalPlan::CrossJoin(cross_join) => {
let predicates =
split_conjunction_owned(filter.predicate.clone());
- push_down_all_join(
+ let join =
convert_cross_join_to_inner_join(cross_join.clone())?;
+ let join_plan = LogicalPlan::Join(join);
+ let inputs = join_plan.inputs();
+ let left = inputs[0];
+ let right = inputs[1];
+ let plan = push_down_all_join(
predicates,
vec![],
- &filter.input,
+ &join_plan,
left,
right,
vec![],
- false,
- )?
+ true,
+ )?;
+ convert_to_cross_join_if_beneficial(plan)?
}
LogicalPlan::TableScan(scan) => {
let filter_predicates = split_conjunction(&filter.predicate);
@@ -955,6 +965,36 @@ impl PushDownFilter {
}
}
+/// Convert cross join to join by pushing down filter predicate to the join
condition
+fn convert_cross_join_to_inner_join(cross_join: CrossJoin) -> Result<Join> {
+ let CrossJoin { left, right, .. } = cross_join;
+ let join_schema = build_join_schema(left.schema(), right.schema(),
&JoinType::Inner)?;
+ // predicate is given
+ Ok(Join {
+ left,
+ right,
+ join_type: JoinType::Inner,
+ join_constraint: JoinConstraint::On,
+ on: vec![],
+ filter: None,
+ schema: DFSchemaRef::new(join_schema),
+ null_equals_null: true,
+ })
+}
+
+/// Converts the inner join with empty equality predicate and empty filter
condition to the cross join
+fn convert_to_cross_join_if_beneficial(plan: LogicalPlan) ->
Result<LogicalPlan> {
+ if let LogicalPlan::Join(join) = &plan {
+ // Can be converted back to cross join
+ if join.on.is_empty() && join.filter.is_none() {
+ return LogicalPlanBuilder::from(join.left.as_ref().clone())
+ .cross_join(join.right.as_ref().clone())?
+ .build();
+ }
+ }
+ Ok(plan)
+}
+
/// replaces columns by its name on the projection.
pub fn replace_cols_by_name(
e: Expr,
@@ -1026,13 +1066,16 @@ fn contain(e: &Expr, check_map: &HashMap<String, Expr>)
-> bool {
#[cfg(test)]
mod tests {
+ use std::fmt::{Debug, Formatter};
+ use std::sync::Arc;
+
use super::*;
use crate::optimizer::Optimizer;
use crate::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate;
use crate::test::*;
use crate::OptimizerContext;
+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
- use async_trait::async_trait;
use datafusion_common::{DFSchema, DFSchemaRef};
use datafusion_expr::logical_plan::table_scan;
use datafusion_expr::{
@@ -1040,8 +1083,8 @@ mod tests {
BinaryExpr, Expr, Extension, LogicalPlanBuilder, Operator, TableSource,
TableType, UserDefinedLogicalNodeCore,
};
- use std::fmt::{Debug, Formatter};
- use std::sync::Arc;
+
+ use async_trait::async_trait;
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) ->
Result<()> {
crate::test::assert_optimized_plan_eq(
@@ -2665,14 +2708,12 @@ Projection: a, b
.cross_join(right)?
.filter(filter)?
.build()?;
-
let expected = "\
- Filter: test.a = d AND test.b > UInt32(1) OR test.b = e AND test.c <
UInt32(10)\
- \n CrossJoin:\
- \n Projection: test.a, test.b, test.c\
- \n TableScan: test, full_filters=[test.b > UInt32(1) OR test.c <
UInt32(10)]\
- \n Projection: test1.a AS d, test1.a AS e\
- \n TableScan: test1";
+ Inner Join: Filter: test.a = d AND test.b > UInt32(1) OR test.b = e
AND test.c < UInt32(10)\
+ \n Projection: test.a, test.b, test.c\
+ \n TableScan: test, full_filters=[test.b > UInt32(1) OR test.c <
UInt32(10)]\
+ \n Projection: test1.a AS d, test1.a AS e\
+ \n TableScan: test1";
assert_optimized_plan_eq_with_rewrite_predicate(&plan, expected)?;
// Originally global state which can help to avoid duplicate Filters
been generated and pushed down.
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index 1ad17fbb8c..eee213811f 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -3466,6 +3466,23 @@ SortPreservingMergeExec: [a@0 ASC]
----------------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
------------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b],
output_ordering=[a@0 ASC, b@1 ASC NULLS LAST], has_header=true
+query TT
+EXPLAIN SELECT *
+FROM annotated_data as l, annotated_data as r
+WHERE l.a > r.a
+----
+logical_plan
+Inner Join: Filter: l.a > r.a
+--SubqueryAlias: l
+----TableScan: annotated_data projection=[a0, a, b, c, d]
+--SubqueryAlias: r
+----TableScan: annotated_data projection=[a0, a, b, c, d]
+physical_plan
+NestedLoopJoinExec: join_type=Inner, filter=a@0 > a@1
+--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
+--CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
+
####
# Config teardown
####