This is an automated email from the ASF dual-hosted git repository.

berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 38ccb00710 Add swap_inputs to SMJ (#13984)
38ccb00710 is described below

commit 38ccb0071045be1fae672ce2561c001f5d505efb
Author: Mehmet Ozan Kabak <[email protected]>
AuthorDate: Thu Jan 2 18:20:57 2025 +0300

    Add swap_inputs to SMJ (#13984)
---
 .../core/src/physical_optimizer/join_selection.rs  |  4 ++-
 .../physical-plan/src/joins/sort_merge_join.rs     | 36 ++++++++++++++++++++--
 2 files changed, 37 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs 
b/datafusion/core/src/physical_optimizer/join_selection.rs
index d7a2f17401..736c3fbd01 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -552,7 +552,9 @@ fn hash_join_swap_subrule(
 /// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`] and
 /// [`JoinType::RightSemi`] can not run with an unbounded left side, even if
 /// we swap join sides. Therefore, we do not consider them here.
-fn swap_join_according_to_unboundedness(
+/// This function is crate public as it is useful for downstream projects
+/// to implement, or experiment with, their own join selection rules.
+pub(crate) fn swap_join_according_to_unboundedness(
     hash_join: &HashJoinExec,
 ) -> Result<Arc<dyn ExecutionPlan>> {
     let partition_mode = hash_join.partition_mode();
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 438d981847..54bd63084e 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -58,7 +58,8 @@ use crate::execution_plan::{boundedness_from_children, 
EmissionType};
 use crate::expressions::PhysicalSortExpr;
 use crate::joins::utils::{
     build_join_schema, check_join_is_valid, estimate_join_statistics,
-    symmetric_join_output_partitioning, JoinFilter, JoinOn, JoinOnRef,
+    reorder_output_after_swap, symmetric_join_output_partitioning, JoinFilter, 
JoinOn,
+    JoinOnRef,
 };
 use crate::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, 
MetricsSet};
 use crate::spill::spill_record_batches;
@@ -73,7 +74,7 @@ use futures::{Stream, StreamExt};
 /// Join execution plan that executes equi-join predicates on multiple 
partitions using Sort-Merge
 /// join algorithm and applies an optional filter post join. Can be used to 
join arbitrarily large
 /// inputs where one or both of the inputs don't fit in the available memory.
-///  
+///
 /// # Join Expressions
 ///
 /// Equi-join predicate (e.g. `<col1> = <col2>`) expressions are represented 
by [`Self::on`].
@@ -311,6 +312,37 @@ impl SortMergeJoinExec {
             boundedness_from_children([left, right]),
         )
     }
+
+    pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>> {
+        let left = self.left();
+        let right = self.right();
+        let new_join = SortMergeJoinExec::try_new(
+            Arc::clone(right),
+            Arc::clone(left),
+            self.on()
+                .iter()
+                .map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
+                .collect::<Vec<_>>(),
+            self.filter().as_ref().map(JoinFilter::swap),
+            self.join_type().swap(),
+            self.sort_options.clone(),
+            self.null_equals_null,
+        )?;
+
+        // TODO: OR this condition with having a built-in projection (like
+        //       ordinary hash join) when we support it.
+        if matches!(
+            self.join_type(),
+            JoinType::LeftSemi
+                | JoinType::RightSemi
+                | JoinType::LeftAnti
+                | JoinType::RightAnti
+        ) {
+            Ok(Arc::new(new_join))
+        } else {
+            reorder_output_after_swap(Arc::new(new_join), &left.schema(), 
&right.schema())
+        }
+    }
 }
 
 impl DisplayAs for SortMergeJoinExec {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to