metesynnada commented on code in PR #8020:
URL: https://github.com/apache/arrow-datafusion/pull/8020#discussion_r1455734159
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -665,6 +668,8 @@ impl ExecutionPlan for HashJoinExec {
reservation,
state: HashJoinStreamState::WaitBuildSide,
build_side: BuildSide::Initial(BuildSideInitialState { left_fut }),
+ batch_size,
Review Comment:
I am not comfortable selecting the limit as batch size since it does not
consider memory. However, I am unsure of how to pick a heuristic value.
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1270,70 +1252,118 @@ impl HashJoinStream {
fn process_probe_batch(
&mut self,
) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
- let state = self.state.try_as_process_probe_batch()?;
+ let state = self.state.try_as_process_probe_batch_mut()?;
let build_side = self.build_side.try_as_ready_mut()?;
self.join_metrics.input_batches.add(1);
self.join_metrics.input_rows.add(state.batch.num_rows());
let timer = self.join_metrics.join_time.timer();
- let mut hashes_buffer = vec![];
- // get the matched two indices for the on condition
- let left_right_indices = build_equal_condition_join_indices(
+ // get the matched by join keys indices
+ let (left_indices, right_indices, next_offset) = lookup_join_hashmap(
build_side.left_data.hash_map(),
build_side.left_data.batch(),
&state.batch,
&self.on_left,
&self.on_right,
&self.random_state,
self.null_equals_null,
- &mut hashes_buffer,
- self.filter.as_ref(),
- JoinSide::Left,
- None,
- true,
- );
+ &mut self.hashes_buffer,
+ self.batch_size,
+ state.offset,
+ )?;
- let result = match left_right_indices {
- Ok((left_side, right_side)) => {
- // set the left bitmap
- // and only left, full, left semi, left anti need the left
bitmap
- if need_produce_result_in_final(self.join_type) {
- left_side.iter().flatten().for_each(|x| {
- build_side.visited_left_side.set_bit(x as usize, true);
- });
- }
+ // apply join filter if exists
+ let (left_indices, right_indices) = if let Some(filter) = &self.filter
{
+ apply_join_filter_to_indices(
+ build_side.left_data.batch(),
+ &state.batch,
+ left_indices,
+ right_indices,
+ filter,
+ JoinSide::Left,
+ )?
+ } else {
+ (left_indices, right_indices)
+ };
- // adjust the two side indices base on the join type
- let (left_side, right_side) = adjust_indices_by_join_type(
- left_side,
- right_side,
- state.batch.num_rows(),
- self.join_type,
- );
+ // mark joined left-side indices as visited, if required by join type
+ if need_produce_result_in_final(self.join_type) {
+ left_indices.iter().flatten().for_each(|x| {
+ build_side.visited_left_side.set_bit(x as usize, true);
+ });
+ }
- let result = build_batch_from_indices(
- &self.schema,
- build_side.left_data.batch(),
- &state.batch,
- &left_side,
- &right_side,
- &self.column_indices,
- JoinSide::Left,
- );
- self.join_metrics.output_batches.add(1);
- self.join_metrics.output_rows.add(state.batch.num_rows());
- result
- }
- Err(err) => {
- exec_err!("Fail to build join indices in HashJoinExec,
error:{err}")
- }
+ // check if probe batch scanned based on `next_offset` returned from
lookup function
+ let probe_batch_scanned = next_offset.is_none()
+ || next_offset.is_some_and(|(probe_idx, build_idx)| {
+ probe_idx + 1 >= state.batch.num_rows()
+ && build_idx.is_some_and(|v| v == 0)
+ });
+
+ // The goals of index alignment for different join types are:
+ //
+ // 1) Right & FullJoin -- to append all missing probe-side indices
between
+ // previous (excluding) and current joined indices.
+ // 2) SemiJoin -- deduplicate probe indices in range between previous
+ // (excluding) and current joined indices.
+ // 3) AntiJoin -- return only missing indices in range between
+ // previous and current joined indices.
+ // Inclusion/exclusion of the indices themselves don't matter
+ //
+ // As a summary -- alignment range can be produced based only on
+ // joined (matched with filters applied) probe side indices, excluding
starting one
+ // (left from previous iteration).
+
+ // if any rows have been joined -- get last joined probe-side (right)
row
+ // it's important that index counts as "joined" after hash collisions
checks
+ // and join filters applied.
+ let last_joined_right_idx = match right_indices.len() {
+ 0 => None,
+ n => Some(right_indices.value(n - 1) as usize),
+ };
+
+ // Calculate range and perform alignment.
+ // In case probe batch has been processed -- align all remaining rows.
+ let index_alignment_range_start = state.joined_probe_idx.map_or(0, |v|
v + 1);
+ let index_alignment_range_end = if probe_batch_scanned {
+ state.batch.num_rows()
+ } else {
+ last_joined_right_idx.map_or(0, |v| v + 1)
};
+
+ let (left_indices, right_indices) = adjust_indices_by_join_type(
+ left_indices,
+ right_indices,
+ index_alignment_range_start..index_alignment_range_end,
+ self.join_type,
+ );
+
+ let result = build_batch_from_indices(
+ &self.schema,
+ build_side.left_data.batch(),
+ &state.batch,
+ &left_indices,
+ &right_indices,
+ &self.column_indices,
+ JoinSide::Left,
+ )?;
+
+ self.join_metrics.output_batches.add(1);
+ self.join_metrics.output_rows.add(result.num_rows());
timer.done();
- self.state = HashJoinStreamState::FetchProbeBatch;
+ if probe_batch_scanned {
Review Comment:
For state machine handling, please take a look at `EagerJoinStream` trait.
It encapsulates the whole build and probe data pulling and state management,
which means state handling and join calculations (inside the implementors of
the trait) are visibly separated. It makes the reading quite easy. You can
follow a layered approach like `EagerJoinStream` and SHJ. This means inside the
method you are calculating joins, do not alter the state.
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1023,11 +1048,9 @@ pub fn build_equal_condition_join_indices<T:
JoinHashMapType>(
random_state: &RandomState,
null_equals_null: bool,
hashes_buffer: &mut Vec<u64>,
- filter: Option<&JoinFilter>,
- build_side: JoinSide,
- deleted_offset: Option<usize>,
- fifo_hashmap: bool,
-) -> Result<(UInt64Array, UInt32Array)> {
+ limit: usize,
+ offset: JoinHashMapOffset,
+) -> Result<(UInt64Array, UInt32Array, Option<JoinHashMapOffset>)> {
let keys_values = probe_on
Review Comment:
I believe the probe hashes are being recalculated unnecessarily if the probe
batch is not completed yet. To improve efficiency, an offset can be used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]