mustafasrepo commented on code in PR #5880:
URL: https://github.com/apache/arrow-datafusion/pull/5880#discussion_r1159635631
##########
datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs:
##########
@@ -2225,21 +2221,39 @@ mod tests {
true,
)
.await?;
- let df = ctx.sql("EXPLAIN SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left
as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 <
t2.a1 + 10").await?;
- let physical_plan = df.create_physical_plan().await?;
- let task_ctx = ctx.task_ctx();
- let results = collect(physical_plan.clone(), task_ctx).await.unwrap();
- let formatted = pretty_format_batches(&results).unwrap().to_string();
- let found = formatted
- .lines()
- .any(|line| line.contains("SymmetricHashJoinExec"));
- assert!(found);
+ let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN
right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
+ let dataframe = ctx.sql(sql).await?;
+ let physical_plan = dataframe.create_physical_plan().await?;
+ let formatted =
crate::physical_plan::displayable(physical_plan.as_ref())
+ .indent()
+ .to_string();
+ let expected = {
+ [
+ "SymmetricHashJoinExec: join_type=Full, on=[(Column { name:
\"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr {
left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 },
cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right:
BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 },
cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right:
Literal { value: Int64(3) } } }, op: And, right: BinaryExpr { left: CastExpr {
expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options:
CastOptions { safe: false } }, op: Lt, right: BinaryExpr { left: CastExpr {
expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options:
CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(10) } }
} }",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=Hash([Column { name:
\"a2\", index: 1 }], 8), input_partitions=1",
+ // " CsvExec: files={1 group: [[tempdir/left.csv]]},
has_header=false, limit=None, projection=[a1, a2]",
+ " CoalesceBatchesExec: target_batch_size=8192",
+ " RepartitionExec: partitioning=Hash([Column { name:
\"a2\", index: 1 }], 8), input_partitions=1",
+ // " CsvExec: files={1 group: [[tempdir/right.csv]]},
has_header=false, limit=None, projection=[a1, a2]"
+ ]
+ };
+ let mut actual: Vec<&str> = formatted.trim().lines().collect();
+ // Remove CSV lines
+ actual.remove(3);
+ actual.remove(5);
+
+ assert_eq!(
+ expected,
+ actual[..],
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn join_change_in_planner_without_sort() -> Result<()> {
Review Comment:
I guess these tests check for physical plan generation. Since we are not
executing anything, I guess `flavor = "multi_thread"` is unnecessary. You can
write the tests `join_change_in_planner_without_sort_not_allowed` and
`join_change_in_planner_without_sort` without `flavor = "multi_thread"` as in
`join_change_in_planner`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]