This is an automated email from the ASF dual-hosted git repository. berkay pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 3d6541d119 minor: Improve equivalence handling of joins (#16893) 3d6541d119 is described below commit 3d6541d119cd11bf56b368960bcbc8b575b79193 Author: Berkay Şahin <124376117+berkaysynn...@users.noreply.github.com> AuthorDate: Mon Aug 11 09:01:36 2025 +0300 minor: Improve equivalence handling of joins (#16893) * minor * Update joins.slt --- .../tests/physical_optimizer/join_selection.rs | 7 +++--- .../src/equivalence/properties/joins.rs | 8 +++++-- datafusion/physical-plan/src/joins/test_utils.rs | 25 ++++++++++++++++++++++ datafusion/physical-plan/src/joins/utils.rs | 4 ++-- datafusion/sqllogictest/test_files/joins.slt | 21 ++++++++++++++++++ 5 files changed, 57 insertions(+), 8 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index 3477ac7712..ee647e0019 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -35,9 +35,7 @@ use datafusion_physical_expr::expressions::{BinaryExpr, Column, NegativeExpr}; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; -use datafusion_physical_optimizer::join_selection::{ - hash_join_swap_subrule, JoinSelection, -}; +use datafusion_physical_optimizer::join_selection::JoinSelection; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::displayable; use datafusion_physical_plan::joins::utils::ColumnIndex; @@ -1501,7 +1499,8 @@ async fn test_join_with_maybe_swap_unbounded_case(t: TestCase) -> Result<()> { NullEquality::NullEqualsNothing, )?) as _; - let optimized_join_plan = hash_join_swap_subrule(join, &ConfigOptions::new())?; + let optimized_join_plan = + JoinSelection::new().optimize(Arc::clone(&join), &ConfigOptions::new())?; // If swap did happen let projection_added = optimized_join_plan.as_any().is::<ProjectionExec>(); diff --git a/datafusion/physical-expr/src/equivalence/properties/joins.rs b/datafusion/physical-expr/src/equivalence/properties/joins.rs index 9329ce56b7..485b11d586 100644 --- a/datafusion/physical-expr/src/equivalence/properties/joins.rs +++ b/datafusion/physical-expr/src/equivalence/properties/joins.rs @@ -52,7 +52,9 @@ pub fn join_equivalence_properties( [true, false] => { // In this special case, right side ordering can be prefixed with // the left side ordering. - if let (Some(JoinSide::Left), JoinType::Inner) = (probe_side, join_type) { + if matches!(join_type, JoinType::Inner | JoinType::Left) + && probe_side == Some(JoinSide::Left) + { updated_right_ordering_equivalence_class( &mut right_oeq_class, join_type, @@ -81,7 +83,9 @@ pub fn join_equivalence_properties( )?; // In this special case, left side ordering can be prefixed with // the right side ordering. - if let (Some(JoinSide::Right), JoinType::Inner) = (probe_side, join_type) { + if matches!(join_type, JoinType::Inner | JoinType::Right) + && probe_side == Some(JoinSide::Right) + { // Left side ordering equivalence properties should be prepended // with those of the right side while constructing output ordering // equivalence properties since stream side is the right side. diff --git a/datafusion/physical-plan/src/joins/test_utils.rs b/datafusion/physical-plan/src/joins/test_utils.rs index ea893cc933..de288724c4 100644 --- a/datafusion/physical-plan/src/joins/test_utils.rs +++ b/datafusion/physical-plan/src/joins/test_utils.rs @@ -417,12 +417,14 @@ pub fn build_sides_record_batches( key_cardinality: (i32, i32), ) -> Result<(RecordBatch, RecordBatch)> { let null_ratio: f64 = 0.4; + let duplicate_ratio = 0.4; let initial_range = 0..table_size; let index = (table_size as f64 * null_ratio).round() as i32; let rest_of = index..table_size; let ordered: ArrayRef = Arc::new(Int32Array::from_iter( initial_range.clone().collect::<Vec<i32>>(), )); + let random_ordered = generate_ordered_array(table_size, duplicate_ratio); let ordered_des = Arc::new(Int32Array::from_iter( initial_range.clone().rev().collect::<Vec<i32>>(), )); @@ -501,6 +503,7 @@ pub fn build_sides_record_batches( ), ("li1", Arc::clone(&interval_time)), ("l_float", Arc::clone(&float_asc) as ArrayRef), + ("l_random_ordered", Arc::clone(&random_ordered) as ArrayRef), ])?; let right = RecordBatch::try_from_iter(vec![ ("ra1", Arc::clone(&ordered)), @@ -514,6 +517,7 @@ pub fn build_sides_record_batches( ("r_desc_null_first", ordered_desc_null_first), ("ri1", interval_time), ("r_float", float_asc), + ("r_random_ordered", random_ordered), ])?; Ok((left, right)) } @@ -583,3 +587,24 @@ pub(crate) fn complicated_filter( )?; binary(left_expr, Operator::And, right_expr, filter_schema) } + +fn generate_ordered_array(size: i32, duplicate_ratio: f32) -> Arc<Int32Array> { + let mut rng = StdRng::seed_from_u64(42); + let unique_count = (size as f32 * (1.0 - duplicate_ratio)) as i32; + + // Generate unique random values + let mut values: Vec<i32> = (0..unique_count) + .map(|_| rng.random_range(1..500)) // Modify as per your range + .collect(); + + // Duplicate the values according to the duplicate ratio + for _ in 0..(size - unique_count) { + let index = rng.random_range(0..unique_count); + values.push(values[index as usize]); + } + + // Sort the values to ensure they are ordered + values.sort(); + + Arc::new(Int32Array::from_iter(values)) +} diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 35827d4fcd..65e7a6106f 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1545,7 +1545,7 @@ impl BatchTransformer for BatchSplitter { /// Joins output columns from their left input followed by their right input. /// Thus if the inputs are reordered, the output columns must be reordered to /// match the original order. -pub(crate) fn reorder_output_after_swap( +pub fn reorder_output_after_swap( plan: Arc<dyn ExecutionPlan>, left_schema: &Schema, right_schema: &Schema, @@ -1584,7 +1584,7 @@ fn swap_reverting_projection( } /// This function swaps the given join's projection. -pub(super) fn swap_join_projection( +pub fn swap_join_projection( left_schema_len: usize, right_schema_len: usize, projection: Option<&Vec<usize>>, diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 5d68ed35b2..e7beec17e9 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3348,6 +3348,27 @@ physical_plan 05)------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] 06)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true +# Test ordering preservation for RIGHT join +query TT +EXPLAIN SELECT * +FROM annotated_data as l_table +RIGHT JOIN (SELECT * FROM annotated_data) as r_table +ON l_table.b = r_table.b +ORDER BY r_table.a ASC NULLS FIRST, r_table.b, r_table.c, l_table.a ASC NULLS FIRST; +---- +logical_plan +01)Sort: r_table.a ASC NULLS FIRST, r_table.b ASC NULLS LAST, r_table.c ASC NULLS LAST, l_table.a ASC NULLS FIRST +02)--Right Join: l_table.b = r_table.b +03)----SubqueryAlias: l_table +04)------TableScan: annotated_data projection=[a0, a, b, c, d] +05)----SubqueryAlias: r_table +06)------TableScan: annotated_data projection=[a0, a, b, c, d] +physical_plan +01)CoalesceBatchesExec: target_batch_size=2 +02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(b@2, b@2)] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true + query TT EXPLAIN SELECT l.a, LAST_VALUE(r.b ORDER BY r.a ASC NULLS FIRST) as last_col1 FROM annotated_data as l --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org