This is an automated email from the ASF dual-hosted git repository.

berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3d6541d119 minor: Improve equivalence handling of joins (#16893)
3d6541d119 is described below

commit 3d6541d119cd11bf56b368960bcbc8b575b79193
Author: Berkay Şahin <124376117+berkaysynn...@users.noreply.github.com>
AuthorDate: Mon Aug 11 09:01:36 2025 +0300

    minor: Improve equivalence handling of joins (#16893)
    
    * minor
    
    * Update joins.slt
---
 .../tests/physical_optimizer/join_selection.rs     |  7 +++---
 .../src/equivalence/properties/joins.rs            |  8 +++++--
 datafusion/physical-plan/src/joins/test_utils.rs   | 25 ++++++++++++++++++++++
 datafusion/physical-plan/src/joins/utils.rs        |  4 ++--
 datafusion/sqllogictest/test_files/joins.slt       | 21 ++++++++++++++++++
 5 files changed, 57 insertions(+), 8 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs 
b/datafusion/core/tests/physical_optimizer/join_selection.rs
index 3477ac7712..ee647e0019 100644
--- a/datafusion/core/tests/physical_optimizer/join_selection.rs
+++ b/datafusion/core/tests/physical_optimizer/join_selection.rs
@@ -35,9 +35,7 @@ use datafusion_physical_expr::expressions::{BinaryExpr, 
Column, NegativeExpr};
 use datafusion_physical_expr::intervals::utils::check_support;
 use datafusion_physical_expr::PhysicalExprRef;
 use datafusion_physical_expr::{EquivalenceProperties, Partitioning, 
PhysicalExpr};
-use datafusion_physical_optimizer::join_selection::{
-    hash_join_swap_subrule, JoinSelection,
-};
+use datafusion_physical_optimizer::join_selection::JoinSelection;
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
 use datafusion_physical_plan::displayable;
 use datafusion_physical_plan::joins::utils::ColumnIndex;
@@ -1501,7 +1499,8 @@ async fn test_join_with_maybe_swap_unbounded_case(t: 
TestCase) -> Result<()> {
         NullEquality::NullEqualsNothing,
     )?) as _;
 
-    let optimized_join_plan = hash_join_swap_subrule(join, 
&ConfigOptions::new())?;
+    let optimized_join_plan =
+        JoinSelection::new().optimize(Arc::clone(&join), 
&ConfigOptions::new())?;
 
     // If swap did happen
     let projection_added = optimized_join_plan.as_any().is::<ProjectionExec>();
diff --git a/datafusion/physical-expr/src/equivalence/properties/joins.rs 
b/datafusion/physical-expr/src/equivalence/properties/joins.rs
index 9329ce56b7..485b11d586 100644
--- a/datafusion/physical-expr/src/equivalence/properties/joins.rs
+++ b/datafusion/physical-expr/src/equivalence/properties/joins.rs
@@ -52,7 +52,9 @@ pub fn join_equivalence_properties(
         [true, false] => {
             // In this special case, right side ordering can be prefixed with
             // the left side ordering.
-            if let (Some(JoinSide::Left), JoinType::Inner) = (probe_side, 
join_type) {
+            if matches!(join_type, JoinType::Inner | JoinType::Left)
+                && probe_side == Some(JoinSide::Left)
+            {
                 updated_right_ordering_equivalence_class(
                     &mut right_oeq_class,
                     join_type,
@@ -81,7 +83,9 @@ pub fn join_equivalence_properties(
             )?;
             // In this special case, left side ordering can be prefixed with
             // the right side ordering.
-            if let (Some(JoinSide::Right), JoinType::Inner) = (probe_side, 
join_type) {
+            if matches!(join_type, JoinType::Inner | JoinType::Right)
+                && probe_side == Some(JoinSide::Right)
+            {
                 // Left side ordering equivalence properties should be 
prepended
                 // with those of the right side while constructing output 
ordering
                 // equivalence properties since stream side is the right side.
diff --git a/datafusion/physical-plan/src/joins/test_utils.rs 
b/datafusion/physical-plan/src/joins/test_utils.rs
index ea893cc933..de288724c4 100644
--- a/datafusion/physical-plan/src/joins/test_utils.rs
+++ b/datafusion/physical-plan/src/joins/test_utils.rs
@@ -417,12 +417,14 @@ pub fn build_sides_record_batches(
     key_cardinality: (i32, i32),
 ) -> Result<(RecordBatch, RecordBatch)> {
     let null_ratio: f64 = 0.4;
+    let duplicate_ratio = 0.4;
     let initial_range = 0..table_size;
     let index = (table_size as f64 * null_ratio).round() as i32;
     let rest_of = index..table_size;
     let ordered: ArrayRef = Arc::new(Int32Array::from_iter(
         initial_range.clone().collect::<Vec<i32>>(),
     ));
+    let random_ordered = generate_ordered_array(table_size, duplicate_ratio);
     let ordered_des = Arc::new(Int32Array::from_iter(
         initial_range.clone().rev().collect::<Vec<i32>>(),
     ));
@@ -501,6 +503,7 @@ pub fn build_sides_record_batches(
         ),
         ("li1", Arc::clone(&interval_time)),
         ("l_float", Arc::clone(&float_asc) as ArrayRef),
+        ("l_random_ordered", Arc::clone(&random_ordered) as ArrayRef),
     ])?;
     let right = RecordBatch::try_from_iter(vec![
         ("ra1", Arc::clone(&ordered)),
@@ -514,6 +517,7 @@ pub fn build_sides_record_batches(
         ("r_desc_null_first", ordered_desc_null_first),
         ("ri1", interval_time),
         ("r_float", float_asc),
+        ("r_random_ordered", random_ordered),
     ])?;
     Ok((left, right))
 }
@@ -583,3 +587,24 @@ pub(crate) fn complicated_filter(
     )?;
     binary(left_expr, Operator::And, right_expr, filter_schema)
 }
+
+fn generate_ordered_array(size: i32, duplicate_ratio: f32) -> Arc<Int32Array> {
+    let mut rng = StdRng::seed_from_u64(42);
+    let unique_count = (size as f32 * (1.0 - duplicate_ratio)) as i32;
+
+    // Generate unique random values
+    let mut values: Vec<i32> = (0..unique_count)
+        .map(|_| rng.random_range(1..500)) // Modify as per your range
+        .collect();
+
+    // Duplicate the values according to the duplicate ratio
+    for _ in 0..(size - unique_count) {
+        let index = rng.random_range(0..unique_count);
+        values.push(values[index as usize]);
+    }
+
+    // Sort the values to ensure they are ordered
+    values.sort();
+
+    Arc::new(Int32Array::from_iter(values))
+}
diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index 35827d4fcd..65e7a6106f 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -1545,7 +1545,7 @@ impl BatchTransformer for BatchSplitter {
 /// Joins output columns from their left input followed by their right input.
 /// Thus if the inputs are reordered, the output columns must be reordered to
 /// match the original order.
-pub(crate) fn reorder_output_after_swap(
+pub fn reorder_output_after_swap(
     plan: Arc<dyn ExecutionPlan>,
     left_schema: &Schema,
     right_schema: &Schema,
@@ -1584,7 +1584,7 @@ fn swap_reverting_projection(
 }
 
 /// This function swaps the given join's projection.
-pub(super) fn swap_join_projection(
+pub fn swap_join_projection(
     left_schema_len: usize,
     right_schema_len: usize,
     projection: Option<&Vec<usize>>,
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index 5d68ed35b2..e7beec17e9 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -3348,6 +3348,27 @@ physical_plan
 05)------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
 06)--------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
file_type=csv, has_header=true
 
+# Test ordering preservation for RIGHT join
+query TT
+EXPLAIN SELECT *
+FROM annotated_data as l_table
+RIGHT JOIN (SELECT * FROM annotated_data) as r_table
+ON l_table.b = r_table.b
+ORDER BY r_table.a ASC NULLS FIRST, r_table.b, r_table.c, l_table.a ASC NULLS 
FIRST;
+----
+logical_plan
+01)Sort: r_table.a ASC NULLS FIRST, r_table.b ASC NULLS LAST, r_table.c ASC 
NULLS LAST, l_table.a ASC NULLS FIRST
+02)--Right Join: l_table.b = r_table.b
+03)----SubqueryAlias: l_table
+04)------TableScan: annotated_data projection=[a0, a, b, c, d]
+05)----SubqueryAlias: r_table
+06)------TableScan: annotated_data projection=[a0, a, b, c, d]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=2
+02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(b@2, b@2)]
+03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
file_type=csv, has_header=true
+04)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
file_type=csv, has_header=true
+
 query TT
 EXPLAIN SELECT l.a, LAST_VALUE(r.b ORDER BY r.a ASC NULLS FIRST) as last_col1
 FROM annotated_data as l


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to