korowa commented on code in PR #8020:
URL: https://github.com/apache/arrow-datafusion/pull/8020#discussion_r1452688577
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -908,23 +912,36 @@ enum HashJoinStreamState {
Completed,
}
-/// Container for HashJoinStreamState::ProcessProbeBatch related data
-struct ProcessProbeBatchState {
- /// Current probe-side batch
- batch: RecordBatch,
-}
-
impl HashJoinStreamState {
/// Tries to extract ProcessProbeBatchState from HashJoinStreamState enum.
/// Returns an error if state is not ProcessProbeBatchState.
- fn try_as_process_probe_batch(&self) -> Result<&ProcessProbeBatchState> {
+ fn try_as_process_probe_batch_mut(&mut self) -> Result<&mut
ProcessProbeBatchState> {
match self {
HashJoinStreamState::ProcessProbeBatch(state) => Ok(state),
_ => internal_err!("Expected hash join stream in ProcessProbeBatch
state"),
}
}
}
+/// Container for HashJoinStreamState::ProcessProbeBatch related data
+struct ProcessProbeBatchState {
+ /// Current probe-side batch
+ batch: RecordBatch,
+ /// Matching offset
Review Comment:
I've updated comment -- now it indicates that this attribute is used as a
starting offset for lookups against join hashmap
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1270,70 +1247,117 @@ impl HashJoinStream {
fn process_probe_batch(
&mut self,
) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
- let state = self.state.try_as_process_probe_batch()?;
+ let state = self.state.try_as_process_probe_batch_mut()?;
let build_side = self.build_side.try_as_ready_mut()?;
self.join_metrics.input_batches.add(1);
self.join_metrics.input_rows.add(state.batch.num_rows());
let timer = self.join_metrics.join_time.timer();
- let mut hashes_buffer = vec![];
- // get the matched two indices for the on condition
- let left_right_indices = build_equal_condition_join_indices(
+ // get the matched by join keys indices
+ let (left_indices, right_indices, next_offset) = lookup_join_hashmap(
build_side.left_data.hash_map(),
build_side.left_data.batch(),
&state.batch,
&self.on_left,
&self.on_right,
&self.random_state,
self.null_equals_null,
- &mut hashes_buffer,
- self.filter.as_ref(),
- JoinSide::Left,
- None,
- true,
- );
+ self.batch_size,
+ state.offset,
+ )?;
- let result = match left_right_indices {
- Ok((left_side, right_side)) => {
- // set the left bitmap
- // and only left, full, left semi, left anti need the left
bitmap
- if need_produce_result_in_final(self.join_type) {
- left_side.iter().flatten().for_each(|x| {
- build_side.visited_left_side.set_bit(x as usize, true);
- });
- }
+ // apply join filter if exists
+ let (left_indices, right_indices) = if let Some(filter) = &self.filter
{
+ apply_join_filter_to_indices(
+ build_side.left_data.batch(),
+ &state.batch,
+ left_indices,
+ right_indices,
+ filter,
+ JoinSide::Left,
+ )?
+ } else {
+ (left_indices, right_indices)
+ };
- // adjust the two side indices base on the join type
- let (left_side, right_side) = adjust_indices_by_join_type(
- left_side,
- right_side,
- state.batch.num_rows(),
- self.join_type,
- );
+ // mark joined left-side indices as visited, if required by join type
+ if need_produce_result_in_final(self.join_type) {
+ left_indices.iter().flatten().for_each(|x| {
+ build_side.visited_left_side.set_bit(x as usize, true);
+ });
+ }
- let result = build_batch_from_indices(
- &self.schema,
- build_side.left_data.batch(),
- &state.batch,
- &left_side,
- &right_side,
- &self.column_indices,
- JoinSide::Left,
- );
- self.join_metrics.output_batches.add(1);
- self.join_metrics.output_rows.add(state.batch.num_rows());
- result
- }
- Err(err) => {
- exec_err!("Fail to build join indices in HashJoinExec,
error:{err}")
- }
+ // check if probe batch scanned based on `next_offset` returned from
lookup function
+ let probe_batch_scanned = next_offset.is_none()
+ || next_offset.is_some_and(|(probe_idx, build_idx)| {
+ probe_idx + 1 >= state.batch.num_rows()
+ && build_idx.is_some_and(|v| v == 0)
+ });
+
+ // The goals of index alignment for different join types are:
+ //
+ // 1) Right & FullJoin -- to append all missing probe-side indices
between
+ // previous (excluding) and current joined indices.
+ // 2) SemiJoin -- deduplicate probe indices in range between previous
+ // (excluding) and current joined indices.
+ // 3) AntiJoin -- return only missing indices in range between
+ // previous and current joined indices.
+ // Inclusion/exclusion of the indices themselves don't matter
+ //
+ // As a summary -- alignment range can be produced based only on
+ // joined (matched with filters applied) probe side indices, excluding
starting one
+ // (left from previous iteration).
+
+ // if any rows have been joined -- get last joined probe-side (right)
row
+ // it's important that index counts as "joined" after hash collisions
checks
+ // and join filters applied.
+ let last_joined_right_idx = match right_indices.len() {
+ 0 => None,
+ n => Some(right_indices.value(n - 1) as usize),
+ };
+
+ // Calculate range and perform alignment.
+ // In case probe batch has been processed -- align all remaining rows.
+ let index_alignment_range_start = state.joined_probe_idx.map_or(0, |v|
v + 1);
+ let index_alignment_range_end = if probe_batch_scanned {
+ state.batch.num_rows()
+ } else {
+ last_joined_right_idx.map_or(0, |v| v + 1)
};
+
+ let (left_indices, right_indices) = adjust_indices_by_join_type(
+ left_indices,
+ right_indices,
+ index_alignment_range_start..index_alignment_range_end,
+ self.join_type,
+ );
+
+ let result = build_batch_from_indices(
+ &self.schema,
+ build_side.left_data.batch(),
+ &state.batch,
+ &left_indices,
+ &right_indices,
+ &self.column_indices,
+ JoinSide::Left,
+ )?;
+
+ self.join_metrics.output_batches.add(1);
+ self.join_metrics.output_rows.add(state.batch.num_rows());
Review Comment:
It definitely should, thank you.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]