alamb commented on code in PR #13910:
URL: https://github.com/apache/datafusion/pull/13910#discussion_r1897999891


##########
datafusion/common/src/join_type.rs:
##########
@@ -73,6 +73,41 @@ impl JoinType {
     pub fn is_outer(self) -> bool {
         self == JoinType::Left || self == JoinType::Right || self == 
JoinType::Full
     }
+
+    /// returns the new join type we get after swapping the given
+    /// join's inputs.
+    ///
+    /// Panics if self.supports_swap() returns false
+    pub fn swap(&self) -> JoinType {

Review Comment:
   These functions are moved from crate public functions in join_selection



##########
datafusion/core/src/physical_optimizer/join_selection.rs:
##########
@@ -108,197 +105,49 @@ fn supports_collect_by_thresholds(
 }
 
 /// Predicate that checks whether the given join type supports input swapping.
+#[deprecated(since = "45.0.0", note = "use JoinType::supports_swap instead")]

Review Comment:
   This code is moved elsewhere, I chose to mark it as deprecated to help my 
migration as well as forks that might use these functions (e.g. Synnadas)



##########
datafusion/physical-plan/src/joins/join_filter.rs:
##########
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::joins::utils::ColumnIndex;
+use arrow_schema::Schema;
+use datafusion_common::JoinSide;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+use std::sync::Arc;
+
+/// Filter applied before join output. Fields are crate-public to allow
+/// downstream implementations to experiment with custom joins.
+#[derive(Debug, Clone)]
+pub struct JoinFilter {

Review Comment:
   join/utils.rs is already quite large, so I moved this code into its own 
module



##########
datafusion/physical-plan/src/joins/cross_join.rs:
##########
@@ -168,6 +169,19 @@ impl CrossJoinExec {
             boundedness_from_children([left, right]),
         )
     }
+
+    /// Returns a new `ExecutionPlan` that computes the same join as this one,
+    /// with the left and right inputs swapped using the  specified
+    /// `partition_mode`.
+    pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>> {

Review Comment:
   this was just refactored out of the swap rule



##########
datafusion/core/src/physical_optimizer/join_selection.rs:
##########
@@ -383,10 +232,10 @@ pub(crate) fn try_collect_left(
 
     match (left_can_collect, right_can_collect) {
         (true, true) => {
-            if supports_swap(*hash_join.join_type())
+            if hash_join.join_type().supports_swap()

Review Comment:
   I think this code now reads more easily 



##########
datafusion/physical-plan/src/joins/utils.rs:
##########
@@ -1788,6 +1730,32 @@ impl BatchTransformer for BatchSplitter {
     }
 }
 
+/// When the order of the join is changed, the output order of columns must
+/// remain the same.
+///
+/// Returns the expressions that will allow to swap back the values from the
+/// original left as the first columns and those on the right next.
+pub(crate) fn swap_reverting_projection(

Review Comment:
   moved from optimizer utils



##########
datafusion/core/src/physical_optimizer/join_selection.rs:
##########
@@ -108,197 +105,49 @@ fn supports_collect_by_thresholds(
 }
 
 /// Predicate that checks whether the given join type supports input swapping.
+#[deprecated(since = "45.0.0", note = "use JoinType::supports_swap instead")]
+#[allow(dead_code)]
 pub(crate) fn supports_swap(join_type: JoinType) -> bool {
-    matches!(
-        join_type,
-        JoinType::Inner
-            | JoinType::Left
-            | JoinType::Right
-            | JoinType::Full
-            | JoinType::LeftSemi
-            | JoinType::RightSemi
-            | JoinType::LeftAnti
-            | JoinType::RightAnti
-    )
+    join_type.supports_swap()
 }
 
 /// This function returns the new join type we get after swapping the given
 /// join's inputs.
+#[deprecated(since = "45.0.0", note = "use datafusion-functions-nested 
instead")]
+#[allow(dead_code)]
 pub(crate) fn swap_join_type(join_type: JoinType) -> JoinType {
-    match join_type {
-        JoinType::Inner => JoinType::Inner,
-        JoinType::Full => JoinType::Full,
-        JoinType::Left => JoinType::Right,
-        JoinType::Right => JoinType::Left,
-        JoinType::LeftSemi => JoinType::RightSemi,
-        JoinType::RightSemi => JoinType::LeftSemi,
-        JoinType::LeftAnti => JoinType::RightAnti,
-        JoinType::RightAnti => JoinType::LeftAnti,
-        JoinType::LeftMark => {
-            unreachable!("LeftMark join type does not support swapping")
-        }
-    }
-}
-
-/// This function swaps the given join's projection.
-fn swap_join_projection(
-    left_schema_len: usize,
-    right_schema_len: usize,
-    projection: Option<&Vec<usize>>,
-    join_type: &JoinType,
-) -> Option<Vec<usize>> {
-    match join_type {
-        // For Anti/Semi join types, projection should remain unmodified,
-        // since these joins output schema remains the same after swap
-        JoinType::LeftAnti
-        | JoinType::LeftSemi
-        | JoinType::RightAnti
-        | JoinType::RightSemi => projection.cloned(),
-
-        _ => projection.map(|p| {
-            p.iter()
-                .map(|i| {
-                    // If the index is less than the left schema length, it is 
from
-                    // the left schema, so we add the right schema length to 
it.
-                    // Otherwise, it is from the right schema, so we subtract 
the left
-                    // schema length from it.
-                    if *i < left_schema_len {
-                        *i + right_schema_len
-                    } else {
-                        *i - left_schema_len
-                    }
-                })
-                .collect()
-        }),
-    }
+    join_type.swap()
 }
 
 /// This function swaps the inputs of the given join operator.
 /// This function is public so other downstream projects can use it
 /// to construct `HashJoinExec` with right side as the build side.
+#[deprecated(since = "45.0.0", note = "use HashJoinExec::swap_inputs instead")]
 pub fn swap_hash_join(
     hash_join: &HashJoinExec,
     partition_mode: PartitionMode,
 ) -> Result<Arc<dyn ExecutionPlan>> {
-    let left = hash_join.left();
-    let right = hash_join.right();
-    let new_join = HashJoinExec::try_new(
-        Arc::clone(right),
-        Arc::clone(left),
-        hash_join
-            .on()
-            .iter()
-            .map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
-            .collect(),
-        swap_join_filter(hash_join.filter()),
-        &swap_join_type(*hash_join.join_type()),
-        swap_join_projection(
-            left.schema().fields().len(),
-            right.schema().fields().len(),
-            hash_join.projection.as_ref(),
-            hash_join.join_type(),
-        ),
-        partition_mode,
-        hash_join.null_equals_null(),
-    )?;
-    // In case of anti / semi joins or if there is embedded projection in 
HashJoinExec, output column order is preserved, no need to add projection again
-    if matches!(
-        hash_join.join_type(),
-        JoinType::LeftSemi
-            | JoinType::RightSemi
-            | JoinType::LeftAnti
-            | JoinType::RightAnti
-    ) || hash_join.projection.is_some()
-    {
-        Ok(Arc::new(new_join))
-    } else {
-        // TODO avoid adding ProjectionExec again and again, only adding Final 
Projection
-        let proj = ProjectionExec::try_new(
-            swap_reverting_projection(&left.schema(), &right.schema()),
-            Arc::new(new_join),
-        )?;
-        Ok(Arc::new(proj))
-    }
+    hash_join.swap_inputs(partition_mode)

Review Comment:
   This is the key point of this PR -- to move this (and related) function into 
the HashJoinExec itself



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to