This is an automated email from the ASF dual-hosted git repository.
korowa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e9435a920e Fix:fix HashJoin projection swap (#12967)
e9435a920e is described below
commit e9435a920ed84a1956b23e7ab6d13fe833cce3eb
Author: yi wang <[email protected]>
AuthorDate: Sat Oct 19 00:52:23 2024 +0800
Fix:fix HashJoin projection swap (#12967)
* swap_hash_join works with joins with projections
* use non swapped hash join's projection
* clean up
* fix hashjoin projection swap.
* assert hashjoinexec.
* Update datafusion/core/src/physical_optimizer/join_selection.rs
Co-authored-by: Eduard Karacharov <[email protected]>
* fix clippy.
---------
Co-authored-by: Onur Satici <[email protected]>
Co-authored-by: Eduard Karacharov <[email protected]>
---
.../core/src/physical_optimizer/join_selection.rs | 31 +++++++++++++++++++++-
1 file changed, 30 insertions(+), 1 deletion(-)
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs
b/datafusion/core/src/physical_optimizer/join_selection.rs
index 499fb9cbbc..dfaa7dbb89 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -183,13 +183,15 @@ pub fn swap_hash_join(
partition_mode,
hash_join.null_equals_null(),
)?;
+ // In case of anti / semi joins or if there is embedded projection in
HashJoinExec, output column order is preserved, no need to add projection again
if matches!(
hash_join.join_type(),
JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
- ) {
+ ) || hash_join.projection.is_some()
+ {
Ok(Arc::new(new_join))
} else {
// TODO avoid adding ProjectionExec again and again, only adding Final
Projection
@@ -1287,6 +1289,33 @@ mod tests_statistical {
);
}
+ #[tokio::test]
+ async fn test_hash_join_swap_on_joins_with_projections() -> Result<()> {
+ let (big, small) = create_big_and_small();
+ let join = Arc::new(HashJoinExec::try_new(
+ Arc::clone(&big),
+ Arc::clone(&small),
+ vec![(
+ Arc::new(Column::new_with_schema("big_col", &big.schema())?),
+ Arc::new(Column::new_with_schema("small_col",
&small.schema())?),
+ )],
+ None,
+ &JoinType::Inner,
+ Some(vec![1]),
+ PartitionMode::Partitioned,
+ false,
+ )?);
+ let swapped = swap_hash_join(&join.clone(), PartitionMode::Partitioned)
+ .expect("swap_hash_join must support joins with projections");
+ let swapped_join =
swapped.as_any().downcast_ref::<HashJoinExec>().expect(
+ "ProjectionExec won't be added above if HashJoinExec contains
embedded projection",
+ );
+ assert_eq!(swapped_join.projection, Some(vec![0_usize]));
+ assert_eq!(swapped.schema().fields.len(), 1);
+ assert_eq!(swapped.schema().fields[0].name(), "small_col");
+ Ok(())
+ }
+
#[tokio::test]
async fn test_swap_reverting_projection() {
let left_schema = Schema::new(vec![
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]