This is an automated email from the ASF dual-hosted git repository.
berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 38ccb00710 Add swap_inputs to SMJ (#13984)
38ccb00710 is described below
commit 38ccb0071045be1fae672ce2561c001f5d505efb
Author: Mehmet Ozan Kabak <[email protected]>
AuthorDate: Thu Jan 2 18:20:57 2025 +0300
Add swap_inputs to SMJ (#13984)
---
.../core/src/physical_optimizer/join_selection.rs | 4 ++-
.../physical-plan/src/joins/sort_merge_join.rs | 36 ++++++++++++++++++++--
2 files changed, 37 insertions(+), 3 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs
b/datafusion/core/src/physical_optimizer/join_selection.rs
index d7a2f17401..736c3fbd01 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -552,7 +552,9 @@ fn hash_join_swap_subrule(
/// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`] and
/// [`JoinType::RightSemi`] can not run with an unbounded left side, even if
/// we swap join sides. Therefore, we do not consider them here.
-fn swap_join_according_to_unboundedness(
+/// This function is crate public as it is useful for downstream projects
+/// to implement, or experiment with, their own join selection rules.
+pub(crate) fn swap_join_according_to_unboundedness(
hash_join: &HashJoinExec,
) -> Result<Arc<dyn ExecutionPlan>> {
let partition_mode = hash_join.partition_mode();
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 438d981847..54bd63084e 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -58,7 +58,8 @@ use crate::execution_plan::{boundedness_from_children,
EmissionType};
use crate::expressions::PhysicalSortExpr;
use crate::joins::utils::{
build_join_schema, check_join_is_valid, estimate_join_statistics,
- symmetric_join_output_partitioning, JoinFilter, JoinOn, JoinOnRef,
+ reorder_output_after_swap, symmetric_join_output_partitioning, JoinFilter,
JoinOn,
+ JoinOnRef,
};
use crate::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder,
MetricsSet};
use crate::spill::spill_record_batches;
@@ -73,7 +74,7 @@ use futures::{Stream, StreamExt};
/// Join execution plan that executes equi-join predicates on multiple
partitions using Sort-Merge
/// join algorithm and applies an optional filter post join. Can be used to
join arbitrarily large
/// inputs where one or both of the inputs don't fit in the available memory.
-///
+///
/// # Join Expressions
///
/// Equi-join predicate (e.g. `<col1> = <col2>`) expressions are represented
by [`Self::on`].
@@ -311,6 +312,37 @@ impl SortMergeJoinExec {
boundedness_from_children([left, right]),
)
}
+
+ pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>> {
+ let left = self.left();
+ let right = self.right();
+ let new_join = SortMergeJoinExec::try_new(
+ Arc::clone(right),
+ Arc::clone(left),
+ self.on()
+ .iter()
+ .map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
+ .collect::<Vec<_>>(),
+ self.filter().as_ref().map(JoinFilter::swap),
+ self.join_type().swap(),
+ self.sort_options.clone(),
+ self.null_equals_null,
+ )?;
+
+ // TODO: OR this condition with having a built-in projection (like
+ // ordinary hash join) when we support it.
+ if matches!(
+ self.join_type(),
+ JoinType::LeftSemi
+ | JoinType::RightSemi
+ | JoinType::LeftAnti
+ | JoinType::RightAnti
+ ) {
+ Ok(Arc::new(new_join))
+ } else {
+ reorder_output_after_swap(Arc::new(new_join), &left.schema(),
&right.schema())
+ }
+ }
}
impl DisplayAs for SortMergeJoinExec {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]