This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e9816b07e HashJoin order fixing (#7155)
1e9816b07e is described below

commit 1e9816b07edd8e73bb45ababa20a7c22de492d96
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Wed Aug 2 10:35:28 2023 +0300

    HashJoin order fixing (#7155)
    
    * Order fixing
    
    * Update join_disable_repartition_joins.slt
    
    * Update hash_join.rs
    
    * Update hash_join.rs
    
    * Update hash_join.rs
---
 .../core/src/physical_plan/joins/hash_join.rs      | 41 +++++++++++++++++++---
 .../test_files/join_disable_repartition_joins.slt  | 14 ++++----
 2 files changed, 43 insertions(+), 12 deletions(-)

diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs 
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 6159a64be3..b97f2806e6 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -716,14 +716,14 @@ impl RecordBatchStream for HashJoinStream {
 // "+----+----+-----+----+----+-----+",
 // "| a1 | b1 | c1  | a2 | b2 | c2  |",
 // "+----+----+-----+----+----+-----+",
+// "| 9  | 8  | 90  | 8  | 8  | 80  |",
 // "| 11 | 8  | 110 | 8  | 8  | 80  |",
 // "| 13 | 10 | 130 | 10 | 10 | 100 |",
 // "| 13 | 10 | 130 | 12 | 10 | 120 |",
-// "| 9  | 8  | 90  | 8  | 8  | 80  |",
 // "+----+----+-----+----+----+-----+"
 // And the result of build and probe indices are:
-// Build indices:  5, 6, 6, 4
-// Probe indices: 3, 4, 5, 3
+// Build indices: 4, 5, 6, 6
+// Probe indices: 3, 3, 4, 5
 #[allow(clippy::too_many_arguments)]
 pub fn build_equal_condition_join_indices(
     build_hashmap: &JoinHashMap,
@@ -754,8 +754,36 @@ pub fn build_equal_condition_join_indices(
     // Using a buffer builder to avoid slower normal builder
     let mut build_indices = UInt64BufferBuilder::new(0);
     let mut probe_indices = UInt32BufferBuilder::new(0);
-    // Visit all of the probe rows
-    for (row, hash_value) in hash_values.iter().enumerate() {
+    // The chained list algorithm generates build indices for each probe row 
in a reversed sequence as such:
+    // Build Indices: [5, 4, 3]
+    // Probe Indices: [1, 1, 1]
+    //
+    // This affects the output sequence. Hypothetically, it's possible to 
preserve the lexicographic order on the build side.
+    // Let's consider probe rows [0,1] as an example:
+    //
+    // When the probe iteration sequence is reversed, the following pairings 
can be derived:
+    //
+    // For probe row 1:
+    //     (5, 1)
+    //     (4, 1)
+    //     (3, 1)
+    //
+    // For probe row 0:
+    //     (5, 0)
+    //     (4, 0)
+    //     (3, 0)
+    //
+    // After reversing both sets of indices, we obtain reversed indices:
+    //
+    //     (3,0)
+    //     (4,0)
+    //     (5,0)
+    //     (3,1)
+    //     (4,1)
+    //     (5,1)
+    //
+    // With this approach, the lexicographic order on both the probe side and 
the build side is preserved.
+    for (row, hash_value) in hash_values.iter().enumerate().rev() {
         // Get the hash and find it in the build index
 
         // For every item on the build and probe we check if it matches
@@ -779,6 +807,9 @@ pub fn build_equal_condition_join_indices(
             }
         }
     }
+    // Reversing both sets of indices
+    build_indices.as_slice_mut().reverse();
+    probe_indices.as_slice_mut().reverse();
 
     let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(), 
None);
     let right: UInt32Array = 
PrimitiveArray::new(probe_indices.finish().into(), None);
diff --git 
a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
 
b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
index fcc6d665c6..daeb7aad9a 100644
--- 
a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
+++ 
b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
@@ -63,18 +63,18 @@ GlobalLimitExec: skip=0, fetch=5
 --------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], 
output_ordering=[a@0 ASC NULLS LAST], has_header=true
 
 # preserve_inner_join
-query III nosort
-SELECT t1.a, t2.a as a2, t2.b
+query IIII nosort
+SELECT t1.a, t1.b, t1.c, t2.a as a2
  FROM annotated_data as t1
  INNER JOIN annotated_data as t2
  ON t1.d = t2.d ORDER BY a2, t2.b
  LIMIT 5
 ----
-1 0 0
-1 0 0
-1 0 0
-1 0 0
-1 0 0
+0 0 0 0
+0 0 2 0
+0 0 3 0
+0 0 6 0
+0 0 20 0
 
 query TT
 EXPLAIN SELECT t2.a as a2, t2.b

Reply via email to