This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 05f0fa1462 Minor: Update LogicalPlan::join_on API, use it more (#7814)
05f0fa1462 is described below
commit 05f0fa14628dab1cf27a3b15c3ae7d53cc01d1b2
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Oct 13 11:02:34 2023 -0400
Minor: Update LogicalPlan::join_on API, use it more (#7814)
* Minor: Update DataFrame::join_on API, use it more
* fix doc
---
datafusion/expr/src/logical_plan/builder.rs | 11 ++++---
.../src/decorrelate_predicate_subquery.rs | 9 ++----
datafusion/optimizer/src/eliminate_join.rs | 8 ++---
.../optimizer/src/extract_equijoin_predicate.rs | 36 ++++++----------------
.../optimizer/src/scalar_subquery_to_join.rs | 14 ++-------
datafusion/sql/src/relation/join.rs | 7 +----
datafusion/substrait/src/logical_plan/consumer.rs | 7 +----
7 files changed, 24 insertions(+), 68 deletions(-)
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 539f6382ad..c3f576195e 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -650,9 +650,8 @@ impl LogicalPlanBuilder {
///
/// let right_plan = LogicalPlanBuilder::scan("right", right_table,
None)?.build()?;
///
- /// let exprs = vec![col("left.a").eq(col("right.a")),
col("left.b").not_eq(col("right.b"))]
- /// .into_iter()
- /// .reduce(Expr::and);
+ /// let exprs = vec![col("left.a").eq(col("right.a")),
col("left.b").not_eq(col("right.b"))];
+ ///
/// let plan = LogicalPlanBuilder::scan("left", left_table, None)?
/// .join_on(right_plan, JoinType::Inner, exprs)?
/// .build()?;
@@ -663,13 +662,15 @@ impl LogicalPlanBuilder {
self,
right: LogicalPlan,
join_type: JoinType,
- on_exprs: Option<Expr>,
+ on_exprs: impl IntoIterator<Item = Expr>,
) -> Result<Self> {
+ let filter = on_exprs.into_iter().reduce(Expr::and);
+
self.join_detailed(
right,
join_type,
(Vec::<Column>::new(), Vec::<Column>::new()),
- on_exprs,
+ filter,
false,
)
}
diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
index 432d7f053a..96b46663d8 100644
--- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
+++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
@@ -21,7 +21,7 @@ use crate::utils::{conjunction, replace_qualified_name,
split_conjunction};
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::alias::AliasGenerator;
use datafusion_common::tree_node::TreeNode;
-use datafusion_common::{plan_err, Column, DataFusionError, Result};
+use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_expr::expr::{Exists, InSubquery};
use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
use datafusion_expr::logical_plan::{JoinType, Subquery};
@@ -282,12 +282,7 @@ fn build_join(
false => JoinType::LeftSemi,
};
let new_plan = LogicalPlanBuilder::from(left.clone())
- .join(
- sub_query_alias,
- join_type,
- (Vec::<Column>::new(), Vec::<Column>::new()),
- Some(join_filter),
- )?
+ .join_on(sub_query_alias, join_type, Some(join_filter))?
.build()?;
debug!(
"predicate subquery optimized:\n{}",
diff --git a/datafusion/optimizer/src/eliminate_join.rs
b/datafusion/optimizer/src/eliminate_join.rs
index 00abcdcc68..0dbebcc8a0 100644
--- a/datafusion/optimizer/src/eliminate_join.rs
+++ b/datafusion/optimizer/src/eliminate_join.rs
@@ -77,7 +77,7 @@ impl OptimizerRule for EliminateJoin {
mod tests {
use crate::eliminate_join::EliminateJoin;
use crate::test::*;
- use datafusion_common::{Column, Result, ScalarValue};
+ use datafusion_common::{Result, ScalarValue};
use datafusion_expr::JoinType::Inner;
use datafusion_expr::{logical_plan::builder::LogicalPlanBuilder, Expr,
LogicalPlan};
use std::sync::Arc;
@@ -89,10 +89,9 @@ mod tests {
#[test]
fn join_on_false() -> Result<()> {
let plan = LogicalPlanBuilder::empty(false)
- .join(
+ .join_on(
LogicalPlanBuilder::empty(false).build()?,
Inner,
- (Vec::<Column>::new(), Vec::<Column>::new()),
Some(Expr::Literal(ScalarValue::Boolean(Some(false)))),
)?
.build()?;
@@ -104,10 +103,9 @@ mod tests {
#[test]
fn join_on_true() -> Result<()> {
let plan = LogicalPlanBuilder::empty(false)
- .join(
+ .join_on(
LogicalPlanBuilder::empty(false).build()?,
Inner,
- (Vec::<Column>::new(), Vec::<Column>::new()),
Some(Expr::Literal(ScalarValue::Boolean(Some(true)))),
)?
.build()?;
diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs
b/datafusion/optimizer/src/extract_equijoin_predicate.rs
index e328eeeb00..575969fbf7 100644
--- a/datafusion/optimizer/src/extract_equijoin_predicate.rs
+++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs
@@ -161,7 +161,6 @@ mod tests {
use super::*;
use crate::test::*;
use arrow::datatypes::DataType;
- use datafusion_common::Column;
use datafusion_expr::{
col, lit, logical_plan::builder::LogicalPlanBuilder, JoinType,
};
@@ -182,12 +181,7 @@ mod tests {
let t2 = test_table_scan_with_name("t2")?;
let plan = LogicalPlanBuilder::from(t1)
- .join(
- t2,
- JoinType::Left,
- (Vec::<Column>::new(), Vec::<Column>::new()),
- Some(col("t1.a").eq(col("t2.a"))),
- )?
+ .join_on(t2, JoinType::Left, Some(col("t1.a").eq(col("t2.a"))))?
.build()?;
let expected = "Left Join: t1.a = t2.a [a:UInt32, b:UInt32, c:UInt32,
a:UInt32;N, b:UInt32;N, c:UInt32;N]\
\n TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]\
@@ -202,10 +196,9 @@ mod tests {
let t2 = test_table_scan_with_name("t2")?;
let plan = LogicalPlanBuilder::from(t1)
- .join(
+ .join_on(
t2,
JoinType::Left,
- (Vec::<Column>::new(), Vec::<Column>::new()),
Some((col("t1.a") + lit(10i64)).eq(col("t2.a") * lit(2u32))),
)?
.build()?;
@@ -222,10 +215,9 @@ mod tests {
let t2 = test_table_scan_with_name("t2")?;
let plan = LogicalPlanBuilder::from(t1)
- .join(
+ .join_on(
t2,
JoinType::Left,
- (Vec::<Column>::new(), Vec::<Column>::new()),
Some(
(col("t1.a") + lit(10i64))
.gt_eq(col("t2.a") * lit(2u32))
@@ -273,10 +265,9 @@ mod tests {
let t2 = test_table_scan_with_name("t2")?;
let plan = LogicalPlanBuilder::from(t1)
- .join(
+ .join_on(
t2,
JoinType::Left,
- (Vec::<Column>::new(), Vec::<Column>::new()),
Some(
col("t1.c")
.eq(col("t2.c"))
@@ -301,10 +292,9 @@ mod tests {
let t3 = test_table_scan_with_name("t3")?;
let input = LogicalPlanBuilder::from(t2)
- .join(
+ .join_on(
t3,
JoinType::Left,
- (Vec::<Column>::new(), Vec::<Column>::new()),
Some(
col("t2.a")
.eq(col("t3.a"))
@@ -313,10 +303,9 @@ mod tests {
)?
.build()?;
let plan = LogicalPlanBuilder::from(t1)
- .join(
+ .join_on(
input,
JoinType::Left,
- (Vec::<Column>::new(), Vec::<Column>::new()),
Some(
col("t1.a")
.eq(col("t2.a"))
@@ -340,10 +329,9 @@ mod tests {
let t3 = test_table_scan_with_name("t3")?;
let input = LogicalPlanBuilder::from(t2)
- .join(
+ .join_on(
t3,
JoinType::Left,
- (Vec::<Column>::new(), Vec::<Column>::new()),
Some(
col("t2.a")
.eq(col("t3.a"))
@@ -352,10 +340,9 @@ mod tests {
)?
.build()?;
let plan = LogicalPlanBuilder::from(t1)
- .join(
+ .join_on(
input,
JoinType::Left,
- (Vec::<Column>::new(), Vec::<Column>::new()),
Some(col("t1.a").eq(col("t2.a")).and(col("t2.c").eq(col("t3.c")))),
)?
.build()?;
@@ -383,12 +370,7 @@ mod tests {
)
.alias("t1.a + 1 = t2.a + 2");
let plan = LogicalPlanBuilder::from(t1)
- .join(
- t2,
- JoinType::Left,
- (Vec::<Column>::new(), Vec::<Column>::new()),
- Some(filter),
- )?
+ .join_on(t2, JoinType::Left, Some(filter))?
.build()?;
let expected = "Left Join: t1.a + CAST(Int64(1) AS UInt32) = t2.a +
CAST(Int32(2) AS UInt32) [a:UInt32, b:UInt32, c:UInt32, a:UInt32;N, b:UInt32;N,
c:UInt32;N]\
\n TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]\
diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs
b/datafusion/optimizer/src/scalar_subquery_to_join.rs
index 96d2f45d80..7ac0c25119 100644
--- a/datafusion/optimizer/src/scalar_subquery_to_join.rs
+++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs
@@ -315,24 +315,14 @@ fn build_join(
_ => {
// if not correlated, group down to 1 row and left join on
that (preserving row count)
LogicalPlanBuilder::from(filter_input.clone())
- .join(
- sub_query_alias,
- JoinType::Left,
- (Vec::<Column>::new(), Vec::<Column>::new()),
- None,
- )?
+ .join_on(sub_query_alias, JoinType::Left, None)?
.build()?
}
}
} else {
// left join if correlated, grouping by the join keys so we don't
change row count
LogicalPlanBuilder::from(filter_input.clone())
- .join(
- sub_query_alias,
- JoinType::Left,
- (Vec::<Column>::new(), Vec::<Column>::new()),
- join_filter_opt,
- )?
+ .join_on(sub_query_alias, JoinType::Left, join_filter_opt)?
.build()?
};
let mut computation_project_expr = HashMap::new();
diff --git a/datafusion/sql/src/relation/join.rs
b/datafusion/sql/src/relation/join.rs
index 0113f337e6..b119672eae 100644
--- a/datafusion/sql/src/relation/join.rs
+++ b/datafusion/sql/src/relation/join.rs
@@ -132,12 +132,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// parse ON expression
let expr = self.sql_to_expr(sql_expr, &join_schema,
planner_context)?;
LogicalPlanBuilder::from(left)
- .join(
- right,
- join_type,
- (Vec::<Column>::new(), Vec::<Column>::new()),
- Some(expr),
- )?
+ .join_on(right, join_type, Some(expr))?
.build()
}
JoinConstraint::Using(idents) => {
diff --git a/datafusion/substrait/src/logical_plan/consumer.rs
b/datafusion/substrait/src/logical_plan/consumer.rs
index e1dde39427..8d99d1981b 100644
--- a/datafusion/substrait/src/logical_plan/consumer.rs
+++ b/datafusion/substrait/src/logical_plan/consumer.rs
@@ -396,12 +396,7 @@ pub async fn from_substrait_rel(
}
None => match &join_filter {
Some(_) => left
- .join(
- right.build()?,
- join_type,
- (Vec::<Column>::new(), Vec::<Column>::new()),
- join_filter,
- )?
+ .join_on(right.build()?, join_type, join_filter)?
.build(),
None => plan_err!("Join without join keys require a valid
filter"),
},