This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new fcd17c85c2 Minor: Encapsulate `LeftJoinData` into a struct (rather
than anonymous enum) and add comments (#8153)
fcd17c85c2 is described below
commit fcd17c85c2eba1c5c8d92beb52f4351286f2dcea
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Nov 14 03:12:04 2023 -0500
Minor: Encapsulate `LeftJoinData` into a struct (rather than anonymous
enum) and add comments (#8153)
* Minor: Encapsulate LeftJoinData into a struct (rather than anonymous enum)
* clippy
---
datafusion/physical-plan/src/joins/hash_join.rs | 72 ++++++++++++++++------
.../physical-plan/src/joins/hash_join_utils.rs | 9 ++-
2 files changed, 61 insertions(+), 20 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs
b/datafusion/physical-plan/src/joins/hash_join.rs
index 546a929bf9..da57fa07cc 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -73,7 +73,47 @@ use datafusion_physical_expr::EquivalenceProperties;
use ahash::RandomState;
use futures::{ready, Stream, StreamExt, TryStreamExt};
-type JoinLeftData = (JoinHashMap, RecordBatch, MemoryReservation);
+/// HashTable and input data for the left (build side) of a join
+struct JoinLeftData {
+ /// The hash table with indices into `batch`
+ hash_map: JoinHashMap,
+ /// The input rows for the build side
+ batch: RecordBatch,
+ /// Memory reservation that tracks memory used by `hash_map` hash table
+ /// `batch`. Cleared on drop.
+ #[allow(dead_code)]
+ reservation: MemoryReservation,
+}
+
+impl JoinLeftData {
+ /// Create a new `JoinLeftData` from its parts
+ fn new(
+ hash_map: JoinHashMap,
+ batch: RecordBatch,
+ reservation: MemoryReservation,
+ ) -> Self {
+ Self {
+ hash_map,
+ batch,
+ reservation,
+ }
+ }
+
+ /// Returns the number of rows in the build side
+ fn num_rows(&self) -> usize {
+ self.batch.num_rows()
+ }
+
+ /// return a reference to the hash map
+ fn hash_map(&self) -> &JoinHashMap {
+ &self.hash_map
+ }
+
+ /// returns a reference to the build side batch
+ fn batch(&self) -> &RecordBatch {
+ &self.batch
+ }
+}
/// Join execution plan: Evaluates eqijoin predicates in parallel on multiple
/// partitions using a hash table and an optional filter list to apply post
@@ -692,8 +732,9 @@ async fn collect_left_input(
// Merge all batches into a single batch, so we
// can directly index into the arrays
let single_batch = concat_batches(&schema, &batches, num_rows)?;
+ let data = JoinLeftData::new(hashmap, single_batch, reservation);
- Ok((hashmap, single_batch, reservation))
+ Ok(data)
}
/// Updates `hash` with new entries from [RecordBatch] evaluated against the
expressions `on`,
@@ -770,7 +811,7 @@ struct HashJoinStream {
left_fut: OnceFut<JoinLeftData>,
/// Which left (probe) side rows have been matches while creating output.
/// For some OUTER joins, we need to know which rows have not been matched
- /// to produce the correct.
+ /// to produce the correct output.
visited_left_side: Option<BooleanBufferBuilder>,
/// right (probe) input
right: SendableRecordBatchStream,
@@ -1042,13 +1083,13 @@ impl HashJoinStream {
{
// TODO: Replace `ceil` wrapper with stable `div_cell` after
// https://github.com/rust-lang/rust/issues/88581
- let visited_bitmap_size = bit_util::ceil(left_data.1.num_rows(),
8);
+ let visited_bitmap_size = bit_util::ceil(left_data.num_rows(), 8);
self.reservation.try_grow(visited_bitmap_size)?;
self.join_metrics.build_mem_used.add(visited_bitmap_size);
}
let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
- let num_rows = left_data.1.num_rows();
+ let num_rows = left_data.num_rows();
if need_produce_result_in_final(self.join_type) {
// Some join types need to track which row has be matched or
unmatched:
// `left semi` join: need to use the bitmap to produce the
matched row in the left side
@@ -1075,8 +1116,8 @@ impl HashJoinStream {
// get the matched two indices for the on condition
let left_right_indices =
build_equal_condition_join_indices(
- &left_data.0,
- &left_data.1,
+ left_data.hash_map(),
+ left_data.batch(),
&batch,
&self.on_left,
&self.on_right,
@@ -1108,7 +1149,7 @@ impl HashJoinStream {
let result = build_batch_from_indices(
&self.schema,
- &left_data.1,
+ left_data.batch(),
&batch,
&left_side,
&right_side,
@@ -1140,7 +1181,7 @@ impl HashJoinStream {
// use the left and right indices to produce the batch
result
let result = build_batch_from_indices(
&self.schema,
- &left_data.1,
+ left_data.batch(),
&empty_right_batch,
&left_side,
&right_side,
@@ -2519,16 +2560,11 @@ mod tests {
("c", &vec![30, 40]),
);
- let left_data = (
- JoinHashMap {
- map: hashmap_left,
- next,
- },
- left,
- );
+ let join_hash_map = JoinHashMap::new(hashmap_left, next);
+
let (l, r) = build_equal_condition_join_indices(
- &left_data.0,
- &left_data.1,
+ &join_hash_map,
+ &left,
&right,
&[Column::new("a", 0)],
&[Column::new("a", 0)],
diff --git a/datafusion/physical-plan/src/joins/hash_join_utils.rs
b/datafusion/physical-plan/src/joins/hash_join_utils.rs
index 5ebf370b6d..fecbf96f08 100644
--- a/datafusion/physical-plan/src/joins/hash_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/hash_join_utils.rs
@@ -103,12 +103,17 @@ use hashbrown::HashSet;
/// ```
pub struct JoinHashMap {
// Stores hash value to last row index
- pub map: RawTable<(u64, u64)>,
+ map: RawTable<(u64, u64)>,
// Stores indices in chained list data structure
- pub next: Vec<u64>,
+ next: Vec<u64>,
}
impl JoinHashMap {
+ #[cfg(test)]
+ pub(crate) fn new(map: RawTable<(u64, u64)>, next: Vec<u64>) -> Self {
+ Self { map, next }
+ }
+
pub(crate) fn with_capacity(capacity: usize) -> Self {
JoinHashMap {
map: RawTable::with_capacity(capacity),