This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c25bd079f Adjustment of HashJoinExec APIs to Preserve Probe Side 
Order (#6858)
7c25bd079f is described below

commit 7c25bd079f9e407ba27ec679efb04eff6b4b4995
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Fri Jul 7 09:39:48 2023 +0300

    Adjustment of HashJoinExec APIs to Preserve Probe Side Order (#6858)
    
    * Implementation and tests
    
    * Update 
datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
    
    Co-authored-by: Mustafa Akur 
<[email protected]>
    
    * Visit comments
    
    * Update hash_join.rs
    
    ---------
    
    Co-authored-by: Mustafa Akur 
<[email protected]>
---
 .../src/physical_optimizer/sort_enforcement.rs     |  53 ++++++++++-
 .../core/src/physical_optimizer/sort_pushdown.rs   |  19 +++-
 .../core/src/physical_plan/joins/hash_join.rs      |  43 ++++++++-
 datafusion/core/src/physical_plan/joins/utils.rs   |  59 +++++++++++-
 .../test_files/join_disable_repartition_joins.slt  | 103 +++++++++++++++++++++
 5 files changed, 265 insertions(+), 12 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs 
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 719c152841..dc1f27738d 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -973,8 +973,8 @@ mod tests {
     use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
     use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
     use crate::physical_plan::filter::FilterExec;
-    use crate::physical_plan::joins::utils::JoinOn;
-    use crate::physical_plan::joins::SortMergeJoinExec;
+    use crate::physical_plan::joins::utils::{JoinFilter, JoinOn};
+    use crate::physical_plan::joins::{HashJoinExec, PartitionMode, 
SortMergeJoinExec};
     use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
     use crate::physical_plan::memory::MemoryExec;
     use crate::physical_plan::repartition::RepartitionExec;
@@ -1699,6 +1699,37 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_remove_unnecessary_sort5() -> Result<()> {
+        let left_schema = create_test_schema2()?;
+        let right_schema = create_test_schema3()?;
+        let left_input = memory_exec(&left_schema);
+        let parquet_sort_exprs = vec![sort_expr("a", &right_schema)];
+        let right_input = parquet_exec_sorted(&right_schema, 
parquet_sort_exprs);
+
+        let on = vec![(
+            Column::new_with_schema("col_a", &left_schema)?,
+            Column::new_with_schema("c", &right_schema)?,
+        )];
+        let join = hash_join_exec(left_input, right_input, on, None, 
&JoinType::Inner)?;
+        let physical_plan = sort_exec(vec![sort_expr("a", &join.schema())], 
join);
+
+        let expected_input = vec![
+            "SortExec: expr=[a@2 ASC]",
+            "  HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_a@0, 
c@2)]",
+            "    MemoryExec: partitions=0, partition_sizes=[]",
+            "    ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[a@0 ASC]",
+        ];
+
+        let expected_optimized = vec![
+            "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_a@0, 
c@2)]",
+            "  MemoryExec: partitions=0, partition_sizes=[]",
+            "  ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[a@0 ASC]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
     #[tokio::test]
     async fn test_remove_unnecessary_spm1() -> Result<()> {
         let schema = create_test_schema()?;
@@ -2759,6 +2790,24 @@ mod tests {
         Arc::new(SortExec::new(sort_exprs, input))
     }
 
+    fn hash_join_exec(
+        left: Arc<dyn ExecutionPlan>,
+        right: Arc<dyn ExecutionPlan>,
+        on: JoinOn,
+        filter: Option<JoinFilter>,
+        join_type: &JoinType,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(HashJoinExec::try_new(
+            left,
+            right,
+            on,
+            filter,
+            join_type,
+            PartitionMode::Partitioned,
+            true,
+        )?))
+    }
+
     fn sort_preserving_merge_exec(
         sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
         input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs 
b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
index 20a5038b7a..06260eb4e1 100644
--- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -17,7 +17,7 @@
 use crate::physical_optimizer::utils::{add_sort_above, is_limit, is_union, 
is_window};
 use crate::physical_plan::filter::FilterExec;
 use crate::physical_plan::joins::utils::JoinSide;
-use crate::physical_plan::joins::SortMergeJoinExec;
+use crate::physical_plan::joins::{HashJoinExec, SortMergeJoinExec};
 use crate::physical_plan::projection::ProjectionExec;
 use crate::physical_plan::repartition::RepartitionExec;
 use crate::physical_plan::sorts::sort::SortExec;
@@ -263,16 +263,25 @@ fn pushdown_requirement_to_children(
         // TODO: Add support for Projection push down
         || plan.as_any().is::<ProjectionExec>()
         || is_limit(plan)
+        || plan.as_any().is::<HashJoinExec>()
     {
         // If the current plan is a leaf node or can not maintain any of the 
input ordering, can not pushed down requirements.
         // For RepartitionExec, we always choose to not push down the sort 
requirements even the RepartitionExec(input_partition=1) could maintain input 
ordering.
         // Pushing down is not beneficial
         Ok(None)
     } else {
-        Ok(Some(vec![
-            parent_required.map(|elem| elem.to_vec());
-            plan.children().len()
-        ]))
+        Ok(Some(
+            maintains_input_order
+                .iter()
+                .map(|flag| {
+                    if *flag {
+                        parent_required.map(|elem| elem.to_vec())
+                    } else {
+                        None
+                    }
+                })
+                .collect::<Vec<_>>(),
+        ))
     }
     // TODO: Add support for Projection push down
 }
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs 
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index d258e7529a..47403a70f4 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -58,7 +58,8 @@ use datafusion_execution::memory_pool::MemoryReservation;
 
 use crate::physical_plan::joins::utils::{
     adjust_indices_by_join_type, apply_join_filter_to_indices, 
build_batch_from_indices,
-    get_final_indices_from_bit_map, need_produce_result_in_final, JoinSide,
+    calculate_hash_join_output_order, get_final_indices_from_bit_map,
+    need_produce_result_in_final, JoinSide,
 };
 use crate::physical_plan::DisplayAs;
 use crate::physical_plan::{
@@ -115,6 +116,8 @@ pub struct HashJoinExec {
     left_fut: OnceAsync<JoinLeftData>,
     /// Shares the `RandomState` for the hashing algorithm
     random_state: RandomState,
+    /// Output order
+    output_order: Option<Vec<PhysicalSortExpr>>,
     /// Partitioning mode to use
     pub(crate) mode: PartitionMode,
     /// Execution metrics
@@ -153,6 +156,13 @@ impl HashJoinExec {
 
         let random_state = RandomState::with_seeds(0, 0, 0, 0);
 
+        let output_order = calculate_hash_join_output_order(
+            join_type,
+            left.output_ordering(),
+            right.output_ordering(),
+            left.schema().fields().len(),
+        )?;
+
         Ok(HashJoinExec {
             left,
             right,
@@ -166,6 +176,7 @@ impl HashJoinExec {
             metrics: ExecutionPlanMetricsSet::new(),
             column_indices,
             null_equals_null,
+            output_order,
         })
     }
 
@@ -328,10 +339,34 @@ impl ExecutionPlan for HashJoinExec {
         }
     }
 
-    // TODO Output ordering might be kept for some cases.
-    // For example if it is inner join then the stream side order can be kept
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
+        self.output_order.as_deref()
+    }
+
+    // For [JoinType::Inner] and [JoinType::RightSemi] in hash joins, the 
probe phase initiates by
+    // applying the hash function to convert the join key(s) in each row into 
a hash value from the
+    // probe side table in the order they're arranged. The hash value is used 
to look up corresponding
+    // entries in the hash table that was constructed from the build side 
table during the build phase.
+    //
+    // Because of the immediate generation of result rows once a match is 
found,
+    // the output of the join tends to follow the order in which the rows were 
read from
+    // the probe side table. This is simply due to the sequence in which the 
rows were processed.
+    // Hence, it appears that the hash join is preserving the order of the 
probe side.
+    //
+    // Meanwhile, in the case of a [JoinType::RightAnti] hash join,
+    // the unmatched rows from the probe side are also kept in order.
+    // This is because the **`RightAnti`** join is designed to return rows 
from the right
+    // (probe side) table that have no match in the left (build side) table. 
Because the rows
+    // are processed sequentially in the probe phase, and unmatched rows are 
directly output
+    // as results, these results tend to retain the order of the probe side 
table.
+    fn maintains_input_order(&self) -> Vec<bool> {
+        vec![
+            false,
+            matches!(
+                self.join_type,
+                JoinType::Inner | JoinType::RightAnti | JoinType::RightSemi
+            ),
+        ]
     }
 
     fn equivalence_properties(&self) -> EquivalenceProperties {
diff --git a/datafusion/core/src/physical_plan/joins/utils.rs 
b/datafusion/core/src/physical_plan/joins/utils.rs
index 627bdeebc5..d4be2cfe10 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -40,7 +40,7 @@ use datafusion_common::cast::as_boolean_array;
 use datafusion_common::{ScalarValue, SharedResult};
 
 use datafusion_common::tree_node::{Transformed, TreeNode};
-use datafusion_physical_expr::{EquivalentClass, PhysicalExpr};
+use datafusion_physical_expr::{EquivalentClass, PhysicalExpr, 
PhysicalSortExpr};
 
 use datafusion_common::JoinType;
 use datafusion_common::{DataFusionError, Result};
@@ -147,6 +147,63 @@ pub fn adjust_right_output_partitioning(
     }
 }
 
+fn adjust_right_order(
+    right_order: &[PhysicalSortExpr],
+    left_columns_len: usize,
+) -> Result<Vec<PhysicalSortExpr>> {
+    right_order
+        .iter()
+        .map(|sort_expr| {
+            let expr = sort_expr.expr.clone();
+            let adjusted = expr.transform_up(&|expr| {
+                Ok(
+                    if let Some(column) = 
expr.as_any().downcast_ref::<Column>() {
+                        let new_col =
+                            Column::new(column.name(), column.index() + 
left_columns_len);
+                        Transformed::Yes(Arc::new(new_col))
+                    } else {
+                        Transformed::No(expr)
+                    },
+                )
+            })?;
+            Ok(PhysicalSortExpr {
+                expr: adjusted,
+                options: sort_expr.options,
+            })
+        })
+        .collect::<Result<Vec<_>>>()
+}
+
+/// Calculate the output order for hash join.
+pub fn calculate_hash_join_output_order(
+    join_type: &JoinType,
+    maybe_left_order: Option<&[PhysicalSortExpr]>,
+    maybe_right_order: Option<&[PhysicalSortExpr]>,
+    left_len: usize,
+) -> Result<Option<Vec<PhysicalSortExpr>>> {
+    match maybe_right_order {
+        Some(right_order) => {
+            let result = match join_type {
+                JoinType::Inner => {
+                    // We modify the indices of the right order columns 
because their
+                    // columns are appended to the right side of the left 
schema.
+                    let mut adjusted_right_order =
+                        adjust_right_order(right_order, left_len)?;
+                    if let Some(left_order) = maybe_left_order {
+                        adjusted_right_order.extend_from_slice(left_order);
+                    }
+                    Some(adjusted_right_order)
+                }
+                JoinType::RightAnti | JoinType::RightSemi => 
Some(right_order.to_vec()),
+                _ => None,
+            };
+
+            Ok(result)
+        }
+        None => Ok(None),
+    }
+}
+
 /// Combine the Equivalence Properties for Join Node
 pub fn combine_join_equivalence_properties(
     join_type: JoinType,
diff --git 
a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
 
b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
index 5f680fcae7..fcc6d665c6 100644
--- 
a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
+++ 
b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
@@ -25,6 +25,109 @@ set datafusion.optimizer.repartition_joins = false;
 
 include ./join.slt
 
+statement ok
+CREATE EXTERNAL TABLE annotated_data (
+  a0 INTEGER,
+  a INTEGER,
+  b INTEGER,
+  c INTEGER,
+  d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC, b ASC, c ASC)
+LOCATION 'tests/data/window_2.csv';
+
+query TT
+EXPLAIN SELECT t2.a
+ FROM annotated_data as t1
+ INNER JOIN annotated_data as t2
+ ON t1.c = t2.c ORDER BY t2.a
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Sort: t2.a ASC NULLS LAST, fetch=5
+----Projection: t2.a
+------Inner Join: t1.c = t2.c
+--------SubqueryAlias: t1
+----------TableScan: annotated_data projection=[c]
+--------SubqueryAlias: t2
+----------TableScan: annotated_data projection=[a, c]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+--ProjectionExec: expr=[a@1 as a]
+----CoalesceBatchesExec: target_batch_size=8192
+------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)]
+--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], 
has_header=true
+--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], 
output_ordering=[a@0 ASC NULLS LAST], has_header=true
+
+# preserve_inner_join
+query III nosort
+SELECT t1.a, t2.a as a2, t2.b
+ FROM annotated_data as t1
+ INNER JOIN annotated_data as t2
+ ON t1.d = t2.d ORDER BY a2, t2.b
+ LIMIT 5
+----
+1 0 0
+1 0 0
+1 0 0
+1 0 0
+1 0 0
+
+query TT
+EXPLAIN SELECT t2.a as a2, t2.b
+    FROM annotated_data as t1
+    RIGHT SEMI JOIN annotated_data as t2
+    ON t1.d = t2.d AND t1.c = t2.c
+    WHERE t2.d = 3
+    ORDER BY a2, t2.b
+LIMIT 10
+----
+logical_plan
+Limit: skip=0, fetch=10
+--Sort: a2 ASC NULLS LAST, t2.b ASC NULLS LAST, fetch=10
+----Projection: t2.a AS a2, t2.b
+------RightSemi Join: t1.d = t2.d, t1.c = t2.c
+--------SubqueryAlias: t1
+----------TableScan: annotated_data projection=[c, d]
+--------SubqueryAlias: t2
+----------Filter: annotated_data.d = Int32(3)
+------------TableScan: annotated_data projection=[a, b, c, d], 
partial_filters=[annotated_data.d = Int32(3)]
+physical_plan
+GlobalLimitExec: skip=0, fetch=10
+--SortPreservingMergeExec: [a2@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10
+----ProjectionExec: expr=[a@0 as a2, b@1 as b]
+------CoalesceBatchesExec: target_batch_size=8192
+--------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), 
(c@0, c@2)]
+----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], 
has_header=true
+----------CoalesceBatchesExec: target_batch_size=8192
+------------FilterExec: d@3 = 3
+--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+----------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, 
c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS 
LAST], has_header=true
+
+# preserve_right_semi_join
+query II nosort
+SELECT t2.a as a2, t2.b
+    FROM annotated_data as t1
+    RIGHT SEMI JOIN annotated_data as t2
+    ON t1.d = t2.d AND t1.c = t2.c
+    WHERE t2.d = 3
+    ORDER BY a2, t2.b
+LIMIT 10
+----
+0 0
+0 0
+0 0
+0 1
+0 1
+0 1
+0 1
+0 1
+1 2
+1 2
+
 # turn on repartition_joins
 statement ok
 set datafusion.optimizer.repartition_joins = true;

Reply via email to