alamb commented on code in PR #8020:
URL: https://github.com/apache/arrow-datafusion/pull/8020#discussion_r1452297783
##########
datafusion/physical-plan/src/joins/symmetric_hash_join.rs:
##########
@@ -835,6 +850,104 @@ pub(crate) fn join_with_probe_batch(
}
}
+/// This method performs lookups against JoinHashMap by hash values of
join-key columns, and handles potential
+/// hash collisions.
+///
+/// # Arguments
+///
+/// * `build_hashmap` - hashmap collected from build side data.
+/// * `build_batch` - Build side record batch.
+/// * `probe_batch` - Probe side record batch.
+/// * `build_on` - An array of columns on which the join will be performed.
The columns are from the build side of the join.
+/// * `probe_on` - An array of columns on which the join will be performed.
The columns are from the probe side of the join.
+/// * `random_state` - The random state for the join.
+/// * `null_equals_null` - A boolean indicating whether NULL values should be
treated as equal when joining.
+/// * `hashes_buffer` - Buffer used for probe side keys hash calculation.
+/// * `probe_batch` - The second record batch to be joined.
+/// * `column_indices` - An array of columns to be selected for the result of
the join.
+/// * `deleted_offset` - deleted offset for build side data.
Review Comment:
There appear to be some extra arguments documented that are not actually
present
```suggestion
/// * `build_hashmap` - hashmap collected from build side data.
/// * `build_batch` - Build side record batch.
/// * `probe_batch` - Probe side record batch.
/// * `build_on` - An array of columns on which the join will be performed.
The columns are from the build side of the join.
/// * `probe_on` - An array of columns on which the join will be performed.
The columns are from the probe side of the join.
/// * `random_state` - The random state for the join.
/// * `null_equals_null` - A boolean indicating whether NULL values should
be treated as equal when joining.
/// * `hashes_buffer` - Buffer used for probe side keys hash calculation.
/// * `deleted_offset` - deleted offset for build side data.
```
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1039,76 +1058,32 @@ pub fn build_equal_condition_join_indices<T:
JoinHashMapType>(
.into_array(build_input_buffer.num_rows())
})
.collect::<Result<Vec<_>>>()?;
- hashes_buffer.clear();
- hashes_buffer.resize(probe_batch.num_rows(), 0);
- let hash_values = create_hashes(&keys_values, random_state,
hashes_buffer)?;
- // In case build-side input has not been inverted while JoinHashMap
creation, the chained list algorithm
- // will return build indices for each probe row in a reverse order as such:
- // Build Indices: [5, 4, 3]
- // Probe Indices: [1, 1, 1]
- //
- // This affects the output sequence. Hypothetically, it's possible to
preserve the lexicographic order on the build side.
- // Let's consider probe rows [0,1] as an example:
- //
- // When the probe iteration sequence is reversed, the following pairings
can be derived:
- //
- // For probe row 1:
- // (5, 1)
- // (4, 1)
- // (3, 1)
- //
- // For probe row 0:
- // (5, 0)
- // (4, 0)
- // (3, 0)
- //
- // After reversing both sets of indices, we obtain reversed indices:
- //
- // (3,0)
- // (4,0)
- // (5,0)
- // (3,1)
- // (4,1)
- // (5,1)
- //
- // With this approach, the lexicographic order on both the probe side and
the build side is preserved.
- let (mut probe_indices, mut build_indices) = if fifo_hashmap {
- build_hashmap.get_matched_indices(hash_values.iter().enumerate(),
deleted_offset)
- } else {
- let (mut matched_probe, mut matched_build) = build_hashmap
- .get_matched_indices(hash_values.iter().enumerate().rev(),
deleted_offset);
-
- matched_probe.as_slice_mut().reverse();
- matched_build.as_slice_mut().reverse();
+ let mut hashes_buffer = vec![0; probe_batch.num_rows()];
Review Comment:
Maybe this is one source of potential slowdown -- the hashes buffer is
reallocated each time? Maybe we could change it to be reused as it was
previously?
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -908,23 +912,36 @@ enum HashJoinStreamState {
Completed,
}
-/// Container for HashJoinStreamState::ProcessProbeBatch related data
-struct ProcessProbeBatchState {
- /// Current probe-side batch
- batch: RecordBatch,
-}
-
impl HashJoinStreamState {
/// Tries to extract ProcessProbeBatchState from HashJoinStreamState enum.
/// Returns an error if state is not ProcessProbeBatchState.
- fn try_as_process_probe_batch(&self) -> Result<&ProcessProbeBatchState> {
+ fn try_as_process_probe_batch_mut(&mut self) -> Result<&mut
ProcessProbeBatchState> {
match self {
HashJoinStreamState::ProcessProbeBatch(state) => Ok(state),
_ => internal_err!("Expected hash join stream in ProcessProbeBatch
state"),
}
}
}
+/// Container for HashJoinStreamState::ProcessProbeBatch related data
+struct ProcessProbeBatchState {
+ /// Current probe-side batch
+ batch: RecordBatch,
+ /// Matching offset
Review Comment:
What does this match? The build side offset that is matched?
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1270,70 +1247,117 @@ impl HashJoinStream {
fn process_probe_batch(
&mut self,
) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
- let state = self.state.try_as_process_probe_batch()?;
+ let state = self.state.try_as_process_probe_batch_mut()?;
let build_side = self.build_side.try_as_ready_mut()?;
self.join_metrics.input_batches.add(1);
self.join_metrics.input_rows.add(state.batch.num_rows());
let timer = self.join_metrics.join_time.timer();
- let mut hashes_buffer = vec![];
- // get the matched two indices for the on condition
- let left_right_indices = build_equal_condition_join_indices(
+ // get the matched by join keys indices
+ let (left_indices, right_indices, next_offset) = lookup_join_hashmap(
build_side.left_data.hash_map(),
build_side.left_data.batch(),
&state.batch,
&self.on_left,
&self.on_right,
&self.random_state,
self.null_equals_null,
- &mut hashes_buffer,
- self.filter.as_ref(),
- JoinSide::Left,
- None,
- true,
- );
+ self.batch_size,
+ state.offset,
+ )?;
- let result = match left_right_indices {
- Ok((left_side, right_side)) => {
- // set the left bitmap
- // and only left, full, left semi, left anti need the left
bitmap
- if need_produce_result_in_final(self.join_type) {
- left_side.iter().flatten().for_each(|x| {
- build_side.visited_left_side.set_bit(x as usize, true);
- });
- }
+ // apply join filter if exists
+ let (left_indices, right_indices) = if let Some(filter) = &self.filter
{
Review Comment:
I think your changes to this code have made it easier to read. Thank you
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -968,7 +987,10 @@ impl RecordBatchStream for HashJoinStream {
}
}
-/// Returns build/probe indices satisfying the equality condition.
+/// Executes lookups by hash against JoinHashMap and resolves potential
+/// hash collisions.
+/// Returns build/probe indices satisfying the equality condition, along with
+/// starting point for next iteration.
Review Comment:
```suggestion
/// (optional) starting point for next iteration.
```
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1270,70 +1247,117 @@ impl HashJoinStream {
fn process_probe_batch(
&mut self,
) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
- let state = self.state.try_as_process_probe_batch()?;
+ let state = self.state.try_as_process_probe_batch_mut()?;
let build_side = self.build_side.try_as_ready_mut()?;
self.join_metrics.input_batches.add(1);
self.join_metrics.input_rows.add(state.batch.num_rows());
let timer = self.join_metrics.join_time.timer();
- let mut hashes_buffer = vec![];
- // get the matched two indices for the on condition
- let left_right_indices = build_equal_condition_join_indices(
+ // get the matched by join keys indices
+ let (left_indices, right_indices, next_offset) = lookup_join_hashmap(
build_side.left_data.hash_map(),
build_side.left_data.batch(),
&state.batch,
&self.on_left,
&self.on_right,
&self.random_state,
self.null_equals_null,
- &mut hashes_buffer,
- self.filter.as_ref(),
- JoinSide::Left,
- None,
- true,
- );
+ self.batch_size,
+ state.offset,
+ )?;
- let result = match left_right_indices {
- Ok((left_side, right_side)) => {
- // set the left bitmap
- // and only left, full, left semi, left anti need the left
bitmap
- if need_produce_result_in_final(self.join_type) {
- left_side.iter().flatten().for_each(|x| {
- build_side.visited_left_side.set_bit(x as usize, true);
- });
- }
+ // apply join filter if exists
+ let (left_indices, right_indices) = if let Some(filter) = &self.filter
{
+ apply_join_filter_to_indices(
+ build_side.left_data.batch(),
+ &state.batch,
+ left_indices,
+ right_indices,
+ filter,
+ JoinSide::Left,
+ )?
+ } else {
+ (left_indices, right_indices)
+ };
- // adjust the two side indices base on the join type
- let (left_side, right_side) = adjust_indices_by_join_type(
- left_side,
- right_side,
- state.batch.num_rows(),
- self.join_type,
- );
+ // mark joined left-side indices as visited, if required by join type
+ if need_produce_result_in_final(self.join_type) {
+ left_indices.iter().flatten().for_each(|x| {
+ build_side.visited_left_side.set_bit(x as usize, true);
+ });
+ }
- let result = build_batch_from_indices(
- &self.schema,
- build_side.left_data.batch(),
- &state.batch,
- &left_side,
- &right_side,
- &self.column_indices,
- JoinSide::Left,
- );
- self.join_metrics.output_batches.add(1);
- self.join_metrics.output_rows.add(state.batch.num_rows());
- result
- }
- Err(err) => {
- exec_err!("Fail to build join indices in HashJoinExec,
error:{err}")
- }
+ // check if probe batch scanned based on `next_offset` returned from
lookup function
+ let probe_batch_scanned = next_offset.is_none()
+ || next_offset.is_some_and(|(probe_idx, build_idx)| {
+ probe_idx + 1 >= state.batch.num_rows()
+ && build_idx.is_some_and(|v| v == 0)
+ });
+
+ // The goals of index alignment for different join types are:
+ //
+ // 1) Right & FullJoin -- to append all missing probe-side indices
between
+ // previous (excluding) and current joined indices.
+ // 2) SemiJoin -- deduplicate probe indices in range between previous
+ // (excluding) and current joined indices.
+ // 3) AntiJoin -- return only missing indices in range between
+ // previous and current joined indices.
+ // Inclusion/exclusion of the indices themselves don't matter
+ //
+ // As a summary -- alignment range can be produced based only on
+ // joined (matched with filters applied) probe side indices, excluding
starting one
+ // (left from previous iteration).
+
+ // if any rows have been joined -- get last joined probe-side (right)
row
+ // it's important that index counts as "joined" after hash collisions
checks
+ // and join filters applied.
+ let last_joined_right_idx = match right_indices.len() {
+ 0 => None,
+ n => Some(right_indices.value(n - 1) as usize),
+ };
+
+ // Calculate range and perform alignment.
+ // In case probe batch has been processed -- align all remaining rows.
+ let index_alignment_range_start = state.joined_probe_idx.map_or(0, |v|
v + 1);
+ let index_alignment_range_end = if probe_batch_scanned {
+ state.batch.num_rows()
+ } else {
+ last_joined_right_idx.map_or(0, |v| v + 1)
};
+
+ let (left_indices, right_indices) = adjust_indices_by_join_type(
+ left_indices,
+ right_indices,
+ index_alignment_range_start..index_alignment_range_end,
+ self.join_type,
+ );
+
+ let result = build_batch_from_indices(
+ &self.schema,
+ build_side.left_data.batch(),
+ &state.batch,
+ &left_indices,
+ &right_indices,
+ &self.column_indices,
+ JoinSide::Left,
+ )?;
+
+ self.join_metrics.output_batches.add(1);
+ self.join_metrics.output_rows.add(state.batch.num_rows());
Review Comment:
Shouldn't the output rows be the number of rows in `result` rather than the
entire batch? If it is the entire batch the output rows will be counted
multiple times I think
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]