alamb commented on code in PR #4602:
URL: https://github.com/apache/arrow-datafusion/pull/4602#discussion_r1050083888
##########
datafusion/proto/src/logical_plan.rs:
##########
@@ -727,17 +733,24 @@ impl AsLogicalPlan for LogicalPlanNode {
extension_codec
)?);
let builder = match join_constraint.into() {
- JoinConstraint::On => builder.join(
+ JoinConstraint::On => builder.join_with_expr_keys(
&into_logical_plan!(join.right, ctx, extension_codec)?,
join_type.into(),
(left_keys, right_keys),
filter,
)?,
- JoinConstraint::Using => builder.join_using(
- &into_logical_plan!(join.right, ctx, extension_codec)?,
- join_type.into(),
- left_keys,
- )?,
+ JoinConstraint::Using => {
+ // The equijoin keys in using-join must be column.
Review Comment:
👍
##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -792,6 +798,97 @@ impl LogicalPlanBuilder {
pub fn build(&self) -> Result<LogicalPlan> {
Ok(self.plan.clone())
}
+
+ /// Apply a join with expression on constraint.
+ ///
+ /// Filter expression expected to contain non-equality predicates that can
not be pushed
+ /// down to any of join inputs.
+ /// In case of outer join, filter applied to only matched rows.
+ pub fn join_with_expr_keys(
+ &self,
+ right: &LogicalPlan,
+ join_type: JoinType,
+ join_keys: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
Review Comment:
Maybe calling this parameter `equi_exprs` would better reflect what it is
(exprs, not column keys) 🤔
##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -2448,3 +2492,289 @@ async fn reduce_cross_join_with_wildcard_and_expr() ->
Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn both_side_expr_key_inner_join() -> Result<()> {
+ let test_repartition_joins = vec![true, false];
+ for repartition_joins in test_repartition_joins {
+ let ctx = create_join_context("t1_id", "t2_id", repartition_joins)?;
+
+ let sql = "SELECT t1.t1_id, t2.t2_id, t1.t1_name \
+ FROM t1 \
+ INNER JOIN t2 \
+ ON t1.t1_id + cast(12 as INT UNSIGNED) = t2.t2_id +
cast(1 as INT UNSIGNED)";
+
+ let msg = format!("Creating logical plan for '{}'", sql);
+ let plan = ctx.create_logical_plan(sql).expect(&msg);
+ let state = ctx.state();
+ let logical_plan = state.optimize(&plan)?;
+ let physical_plan = state.create_physical_plan(&logical_plan).await?;
+
+ let expected = if repartition_joins {
+ vec![
+ "ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id,
t1_name@1 as t1_name]",
+ " ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as
t1_name, t2_id@3 as t2_id]",
Review Comment:
👍 I think that could be improved as a follow on PR
##########
datafusion/sql/src/planner.rs:
##########
@@ -6074,12 +6055,10 @@ mod tests {
ON orders.customer_id * 2 = person.id + 10";
let expected = "Projection: person.id, orders.order_id\
- \n Projection: person.id, person.first_name, person.last_name,
person.age, person.state, person.salary, person.birth_date, person.😀,
orders.order_id, orders.customer_id, orders.o_item_id, orders.qty,
orders.price, orders.delivered\
- \n Inner Join: person.id + Int64(10) = orders.customer_id *
Int64(2)\
- \n Projection: person.id, person.first_name, person.last_name,
person.age, person.state, person.salary, person.birth_date, person.😀, person.id
+ Int64(10)\
- \n TableScan: person\
- \n Projection: orders.order_id, orders.customer_id,
orders.o_item_id, orders.qty, orders.price, orders.delivered,
orders.customer_id * Int64(2)\
- \n TableScan: orders";
+ \n Inner Join: person.id + Int64(10) = orders.customer_id * Int64(2)\
Review Comment:
this plan is certainly much nicer -- I don't understand where all the other
columns used to come from but this is 👍
##########
datafusion/optimizer/src/eliminate_cross_join.rs:
##########
@@ -1103,14 +1082,10 @@ mod tests {
.build()?;
let expected = vec![
- "Filter: t2.c < UInt32(20) [a:UInt32, b:UInt32, c:UInt32,
a:UInt32, b:UInt32, c:UInt32]",
- " Projection: t1.a, t1.b, t1.c, t2.a, t2.b, t2.c [a:UInt32,
b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]",
- " Inner Join: t1.a + UInt32(100) = t2.a * UInt32(2)
[a:UInt32, b:UInt32, c:UInt32, t1.a + UInt32(100):UInt32, a:UInt32, b:UInt32,
c:UInt32, t2.a * UInt32(2):UInt32]",
- " Projection: t1.a, t1.b, t1.c, t1.a + UInt32(100)
[a:UInt32, b:UInt32, c:UInt32, t1.a + UInt32(100):UInt32]",
- " TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]",
- " Projection: t2.a, t2.b, t2.c, t2.a * UInt32(2) [a:UInt32,
b:UInt32, c:UInt32, t2.a * UInt32(2):UInt32]",
- " TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]",
- ];
+ "Filter: t2.c < UInt32(20) [a:UInt32, b:UInt32, c:UInt32,
a:UInt32, b:UInt32, c:UInt32]",
Review Comment:
that is quite cool to see the expressions directly in the Join without
needing a projection to compute them
##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -792,6 +798,97 @@ impl LogicalPlanBuilder {
pub fn build(&self) -> Result<LogicalPlan> {
Ok(self.plan.clone())
}
+
+ /// Apply a join with expression on constraint.
Review Comment:
```suggestion
/// Apply a join with the expression on constraint.
///
/// join_keys are "equijoin" predicates expressions on the existing and
right inputs, respectively.
///
/// filter: any other filter expression to apply during the join.
join_keys predicates are likely
/// to be evaluated more quickly than the filter expressions
```
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -1646,8 +1650,8 @@ pub struct Join {
pub left: Arc<LogicalPlan>,
/// Right input
pub right: Arc<LogicalPlan>,
- /// Equijoin clause expressed as pairs of (left, right) join columns
- pub on: Vec<(Column, Column)>,
+ /// Equijoin clause expressed as pairs of (left, right) join expressions
Review Comment:
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]