This is an automated email from the ASF dual-hosted git repository.

korowa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e9435a920e Fix:fix HashJoin projection swap (#12967)
e9435a920e is described below

commit e9435a920ed84a1956b23e7ab6d13fe833cce3eb
Author: yi wang <[email protected]>
AuthorDate: Sat Oct 19 00:52:23 2024 +0800

    Fix:fix HashJoin projection swap (#12967)
    
    * swap_hash_join works with joins with projections
    
    * use non swapped hash join's projection
    
    * clean up
    
    * fix hashjoin projection swap.
    
    * assert hashjoinexec.
    
    * Update datafusion/core/src/physical_optimizer/join_selection.rs
    
    Co-authored-by: Eduard Karacharov <[email protected]>
    
    * fix clippy.
    
    ---------
    
    Co-authored-by: Onur Satici <[email protected]>
    Co-authored-by: Eduard Karacharov <[email protected]>
---
 .../core/src/physical_optimizer/join_selection.rs  | 31 +++++++++++++++++++++-
 1 file changed, 30 insertions(+), 1 deletion(-)

diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs 
b/datafusion/core/src/physical_optimizer/join_selection.rs
index 499fb9cbbc..dfaa7dbb89 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -183,13 +183,15 @@ pub fn swap_hash_join(
         partition_mode,
         hash_join.null_equals_null(),
     )?;
+    // In case of anti / semi joins or if there is embedded projection in 
HashJoinExec, output column order is preserved, no need to add projection again
     if matches!(
         hash_join.join_type(),
         JoinType::LeftSemi
             | JoinType::RightSemi
             | JoinType::LeftAnti
             | JoinType::RightAnti
-    ) {
+    ) || hash_join.projection.is_some()
+    {
         Ok(Arc::new(new_join))
     } else {
         // TODO avoid adding ProjectionExec again and again, only adding Final 
Projection
@@ -1287,6 +1289,33 @@ mod tests_statistical {
         );
     }
 
+    #[tokio::test]
+    async fn test_hash_join_swap_on_joins_with_projections() -> Result<()> {
+        let (big, small) = create_big_and_small();
+        let join = Arc::new(HashJoinExec::try_new(
+            Arc::clone(&big),
+            Arc::clone(&small),
+            vec![(
+                Arc::new(Column::new_with_schema("big_col", &big.schema())?),
+                Arc::new(Column::new_with_schema("small_col", 
&small.schema())?),
+            )],
+            None,
+            &JoinType::Inner,
+            Some(vec![1]),
+            PartitionMode::Partitioned,
+            false,
+        )?);
+        let swapped = swap_hash_join(&join.clone(), PartitionMode::Partitioned)
+            .expect("swap_hash_join must support joins with projections");
+        let swapped_join = 
swapped.as_any().downcast_ref::<HashJoinExec>().expect(
+            "ProjectionExec won't be added above if HashJoinExec contains 
embedded projection",
+        );
+        assert_eq!(swapped_join.projection, Some(vec![0_usize]));
+        assert_eq!(swapped.schema().fields.len(), 1);
+        assert_eq!(swapped.schema().fields[0].name(), "small_col");
+        Ok(())
+    }
+
     #[tokio::test]
     async fn test_swap_reverting_projection() {
         let left_schema = Schema::new(vec![


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to