This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1e9816b07e HashJoin order fixing (#7155)
1e9816b07e is described below
commit 1e9816b07edd8e73bb45ababa20a7c22de492d96
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Wed Aug 2 10:35:28 2023 +0300
HashJoin order fixing (#7155)
* Order fixing
* Update join_disable_repartition_joins.slt
* Update hash_join.rs
* Update hash_join.rs
* Update hash_join.rs
---
.../core/src/physical_plan/joins/hash_join.rs | 41 +++++++++++++++++++---
.../test_files/join_disable_repartition_joins.slt | 14 ++++----
2 files changed, 43 insertions(+), 12 deletions(-)
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 6159a64be3..b97f2806e6 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -716,14 +716,14 @@ impl RecordBatchStream for HashJoinStream {
// "+----+----+-----+----+----+-----+",
// "| a1 | b1 | c1 | a2 | b2 | c2 |",
// "+----+----+-----+----+----+-----+",
+// "| 9 | 8 | 90 | 8 | 8 | 80 |",
// "| 11 | 8 | 110 | 8 | 8 | 80 |",
// "| 13 | 10 | 130 | 10 | 10 | 100 |",
// "| 13 | 10 | 130 | 12 | 10 | 120 |",
-// "| 9 | 8 | 90 | 8 | 8 | 80 |",
// "+----+----+-----+----+----+-----+"
// And the result of build and probe indices are:
-// Build indices: 5, 6, 6, 4
-// Probe indices: 3, 4, 5, 3
+// Build indices: 4, 5, 6, 6
+// Probe indices: 3, 3, 4, 5
#[allow(clippy::too_many_arguments)]
pub fn build_equal_condition_join_indices(
build_hashmap: &JoinHashMap,
@@ -754,8 +754,36 @@ pub fn build_equal_condition_join_indices(
// Using a buffer builder to avoid slower normal builder
let mut build_indices = UInt64BufferBuilder::new(0);
let mut probe_indices = UInt32BufferBuilder::new(0);
- // Visit all of the probe rows
- for (row, hash_value) in hash_values.iter().enumerate() {
+ // The chained list algorithm generates build indices for each probe row
in a reversed sequence as such:
+ // Build Indices: [5, 4, 3]
+ // Probe Indices: [1, 1, 1]
+ //
+ // This affects the output sequence. Hypothetically, it's possible to
preserve the lexicographic order on the build side.
+ // Let's consider probe rows [0,1] as an example:
+ //
+ // When the probe iteration sequence is reversed, the following pairings
can be derived:
+ //
+ // For probe row 1:
+ // (5, 1)
+ // (4, 1)
+ // (3, 1)
+ //
+ // For probe row 0:
+ // (5, 0)
+ // (4, 0)
+ // (3, 0)
+ //
+ // After reversing both sets of indices, we obtain reversed indices:
+ //
+ // (3,0)
+ // (4,0)
+ // (5,0)
+ // (3,1)
+ // (4,1)
+ // (5,1)
+ //
+ // With this approach, the lexicographic order on both the probe side and
the build side is preserved.
+ for (row, hash_value) in hash_values.iter().enumerate().rev() {
// Get the hash and find it in the build index
// For every item on the build and probe we check if it matches
@@ -779,6 +807,9 @@ pub fn build_equal_condition_join_indices(
}
}
}
+ // Reversing both sets of indices
+ build_indices.as_slice_mut().reverse();
+ probe_indices.as_slice_mut().reverse();
let left: UInt64Array = PrimitiveArray::new(build_indices.finish().into(),
None);
let right: UInt32Array =
PrimitiveArray::new(probe_indices.finish().into(), None);
diff --git
a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
index fcc6d665c6..daeb7aad9a 100644
---
a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
+++
b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
@@ -63,18 +63,18 @@ GlobalLimitExec: skip=0, fetch=5
--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c],
output_ordering=[a@0 ASC NULLS LAST], has_header=true
# preserve_inner_join
-query III nosort
-SELECT t1.a, t2.a as a2, t2.b
+query IIII nosort
+SELECT t1.a, t1.b, t1.c, t2.a as a2
FROM annotated_data as t1
INNER JOIN annotated_data as t2
ON t1.d = t2.d ORDER BY a2, t2.b
LIMIT 5
----
-1 0 0
-1 0 0
-1 0 0
-1 0 0
-1 0 0
+0 0 0 0
+0 0 2 0
+0 0 3 0
+0 0 6 0
+0 0 20 0
query TT
EXPLAIN SELECT t2.a as a2, t2.b