This is an automated email from the ASF dual-hosted git repository.
houqp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 2f702e4 fix: sql planner creates cross join instead of inner join
from select predicates (#1566)
2f702e4 is described below
commit 2f702e4a65cc65745f373896b820b51bc6ab0096
Author: xudong.w <[email protected]>
AuthorDate: Fri Jan 21 11:37:38 2022 +0800
fix: sql planner creates cross join instead of inner join from select
predicates (#1566)
---
datafusion/src/sql/planner.rs | 109 ++++++++++++++++++++++++++++++++++--------
1 file changed, 90 insertions(+), 19 deletions(-)
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index bbd5aa7..ae9f272 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -697,7 +697,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
alias: Option<String>,
) -> Result<LogicalPlan> {
let plans = self.plan_from_tables(&select.from, ctes)?;
-
let plan = match &select.selection {
Some(predicate_expr) => {
// build join schema
@@ -714,33 +713,80 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
extract_possible_join_keys(&filter_expr, &mut
possible_join_keys)?;
let mut all_join_keys = HashSet::new();
- let mut left = plans[0].clone();
- for right in plans.iter().skip(1) {
- let left_schema = left.schema();
- let right_schema = right.schema();
+
+ let mut plans = plans.into_iter();
+ let mut left = plans.next().unwrap(); // have at least one plan
+
+ // List of the plans that have not yet been joined
+ let mut remaining_plans: Vec<Option<LogicalPlan>> =
+ plans.into_iter().map(Some).collect();
+
+ // Take from the list of remaining plans,
+ loop {
let mut join_keys = vec![];
- for (l, r) in &possible_join_keys {
- if left_schema.field_from_column(l).is_ok()
- && right_schema.field_from_column(r).is_ok()
- {
- join_keys.push((l.clone(), r.clone()));
- } else if left_schema.field_from_column(r).is_ok()
- && right_schema.field_from_column(l).is_ok()
- {
- join_keys.push((r.clone(), l.clone()));
- }
- }
+
+ // Search all remaining plans for the next to
+ // join. Prefer the first one that has a join
+ // predicate in the predicate lists
+ let plan_with_idx =
+ remaining_plans.iter().enumerate().find(|(_idx, plan)|
{
+ // skip plans that have been joined already
+ let plan = if let Some(plan) = plan {
+ plan
+ } else {
+ return false;
+ };
+
+ // can we find a match?
+ let left_schema = left.schema();
+ let right_schema = plan.schema();
+ for (l, r) in &possible_join_keys {
+ if left_schema.field_from_column(l).is_ok()
+ &&
right_schema.field_from_column(r).is_ok()
+ {
+ join_keys.push((l.clone(), r.clone()));
+ } else if
left_schema.field_from_column(r).is_ok()
+ &&
right_schema.field_from_column(l).is_ok()
+ {
+ join_keys.push((r.clone(), l.clone()));
+ }
+ }
+ // stop if we found join keys
+ !join_keys.is_empty()
+ });
+
+ // If we did not find join keys, either there are
+ // no more plans, or we can't find any plans that
+ // can be joined with predicates
if join_keys.is_empty() {
- left =
-
LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
+ assert!(plan_with_idx.is_none());
+
+ // pick the first non null plan to join
+ let plan_with_idx = remaining_plans
+ .iter()
+ .enumerate()
+ .find(|(_idx, plan)| plan.is_some());
+ if let Some((idx, _)) = plan_with_idx {
+ let plan = std::mem::take(&mut
remaining_plans[idx]).unwrap();
+ left = LogicalPlanBuilder::from(left)
+ .cross_join(&plan)?
+ .build()?;
+ } else {
+ // no more plans to join
+ break;
+ }
} else {
+ // have a plan
+ let (idx, _) = plan_with_idx.expect("found plan node");
+ let plan = std::mem::take(&mut
remaining_plans[idx]).unwrap();
+
let left_keys: Vec<Column> =
join_keys.iter().map(|(l, _)| l.clone()).collect();
let right_keys: Vec<Column> =
join_keys.iter().map(|(_, r)| r.clone()).collect();
let builder = LogicalPlanBuilder::from(left);
left = builder
- .join(right, JoinType::Inner, (left_keys,
right_keys))?
+ .join(&plan, JoinType::Inner, (left_keys,
right_keys))?
.build()?;
}
@@ -3818,6 +3864,31 @@ mod tests {
\n TableScan: public.person projection=None";
quick_test(sql, expected);
}
+
+ #[test]
+ fn cross_join_to_inner_join() {
+ let sql = "select person.id from person, orders, lineitem where
person.id = lineitem.l_item_id and orders.o_item_id = lineitem.l_description;";
+ let expected = "Projection: #person.id\
+ \n Join: #lineitem.l_description =
#orders.o_item_id\
+ \n Join: #person.id = #lineitem.l_item_id\
+ \n TableScan: person projection=None\
+ \n TableScan: lineitem projection=None\
+ \n TableScan: orders projection=None";
+ quick_test(sql, expected);
+ }
+
+ #[test]
+ fn cross_join_not_to_inner_join() {
+ let sql = "select person.id from person, orders, lineitem where
person.id = person.age;";
+ let expected = "Projection: #person.id\
+ \n Filter: #person.id = #person.age\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n TableScan: person
projection=None\
+ \n TableScan: orders
projection=None\
+ \n TableScan: lineitem
projection=None";
+ quick_test(sql, expected);
+ }
}
fn parse_sql_number(n: &str) -> Result<Expr> {