This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 701cb00e3b fix: join swap for projected semi/anti joins (#13022)
701cb00e3b is described below

commit 701cb00e3b9b24b00908ad4a07ed01d68ccfa7c2
Author: Eduard Karacharov <[email protected]>
AuthorDate: Mon Oct 21 18:48:24 2024 +0300

    fix: join swap for projected semi/anti joins (#13022)
---
 .../core/src/physical_optimizer/join_selection.rs  | 85 +++++++++++++++++-----
 1 file changed, 65 insertions(+), 20 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs 
b/datafusion/core/src/physical_optimizer/join_selection.rs
index dfaa7dbb89..1c63df1f02 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -140,20 +140,32 @@ fn swap_join_projection(
     left_schema_len: usize,
     right_schema_len: usize,
     projection: Option<&Vec<usize>>,
+    join_type: &JoinType,
 ) -> Option<Vec<usize>> {
-    projection.map(|p| {
-        p.iter()
-            .map(|i| {
-                // If the index is less than the left schema length, it is 
from the left schema, so we add the right schema length to it.
-                // Otherwise, it is from the right schema, so we subtract the 
left schema length from it.
-                if *i < left_schema_len {
-                    *i + right_schema_len
-                } else {
-                    *i - left_schema_len
-                }
-            })
-            .collect()
-    })
+    match join_type {
+        // For Anti/Semi join types, projection should remain unmodified,
+        // since these joins output schema remains the same after swap
+        JoinType::LeftAnti
+        | JoinType::LeftSemi
+        | JoinType::RightAnti
+        | JoinType::RightSemi => projection.cloned(),
+
+        _ => projection.map(|p| {
+            p.iter()
+                .map(|i| {
+                    // If the index is less than the left schema length, it is 
from
+                    // the left schema, so we add the right schema length to 
it.
+                    // Otherwise, it is from the right schema, so we subtract 
the left
+                    // schema length from it.
+                    if *i < left_schema_len {
+                        *i + right_schema_len
+                    } else {
+                        *i - left_schema_len
+                    }
+                })
+                .collect()
+        }),
+    }
 }
 
 /// This function swaps the inputs of the given join operator.
@@ -179,6 +191,7 @@ pub fn swap_hash_join(
             left.schema().fields().len(),
             right.schema().fields().len(),
             hash_join.projection.as_ref(),
+            hash_join.join_type(),
         ),
         partition_mode,
         hash_join.null_equals_null(),
@@ -1289,27 +1302,59 @@ mod tests_statistical {
         );
     }
 
+    #[rstest(
+        join_type, projection, small_on_right,
+        case::inner(JoinType::Inner, vec![1], true),
+        case::left(JoinType::Left, vec![1], true),
+        case::right(JoinType::Right, vec![1], true),
+        case::full(JoinType::Full, vec![1], true),
+        case::left_anti(JoinType::LeftAnti, vec![0], false),
+        case::left_semi(JoinType::LeftSemi, vec![0], false),
+        case::right_anti(JoinType::RightAnti, vec![0], true),
+        case::right_semi(JoinType::RightSemi, vec![0], true),
+    )]
     #[tokio::test]
-    async fn test_hash_join_swap_on_joins_with_projections() -> Result<()> {
+    async fn test_hash_join_swap_on_joins_with_projections(
+        join_type: JoinType,
+        projection: Vec<usize>,
+        small_on_right: bool,
+    ) -> Result<()> {
         let (big, small) = create_big_and_small();
+
+        let left = if small_on_right { &big } else { &small };
+        let right = if small_on_right { &small } else { &big };
+
+        let left_on = if small_on_right {
+            "big_col"
+        } else {
+            "small_col"
+        };
+        let right_on = if small_on_right {
+            "small_col"
+        } else {
+            "big_col"
+        };
+
         let join = Arc::new(HashJoinExec::try_new(
-            Arc::clone(&big),
-            Arc::clone(&small),
+            Arc::clone(left),
+            Arc::clone(right),
             vec![(
-                Arc::new(Column::new_with_schema("big_col", &big.schema())?),
-                Arc::new(Column::new_with_schema("small_col", 
&small.schema())?),
+                Arc::new(Column::new_with_schema(left_on, &left.schema())?),
+                Arc::new(Column::new_with_schema(right_on, &right.schema())?),
             )],
             None,
-            &JoinType::Inner,
-            Some(vec![1]),
+            &join_type,
+            Some(projection),
             PartitionMode::Partitioned,
             false,
         )?);
+
         let swapped = swap_hash_join(&join.clone(), PartitionMode::Partitioned)
             .expect("swap_hash_join must support joins with projections");
         let swapped_join = 
swapped.as_any().downcast_ref::<HashJoinExec>().expect(
             "ProjectionExec won't be added above if HashJoinExec contains 
embedded projection",
         );
+
         assert_eq!(swapped_join.projection, Some(vec![0_usize]));
         assert_eq!(swapped.schema().fields.len(), 1);
         assert_eq!(swapped.schema().fields[0].name(), "small_col");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to