ygf11 commented on code in PR #5156: URL: https://github.com/apache/arrow-datafusion/pull/5156#discussion_r1094198531
########## datafusion/core/src/physical_plan/joins/nested_loop_join.rs: ########## @@ -479,6 +404,185 @@ impl NestedLoopJoinStream { } }) } + + fn poll_next_impl_for_build_right( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> Poll<Option<Result<RecordBatch>>> { + // all right row + let right_data = match ready!(self.inner_table.get(cx)) { + Ok(data) => data, + Err(e) => return Poll::Ready(Some(Err(e))), + }; + + // for build right, bitmap is not needed. + let mut empty_visited_left_side = BooleanBufferBuilder::new(0); + self.outer_table + .poll_next_unpin(cx) + .map(|maybe_batch| match maybe_batch { + Some(Ok(left_batch)) => { + let result = join_left_and_right_batch( + &left_batch, + right_data, + self.join_type, + self.filter.as_ref(), + &self.column_indices, + &self.schema, + &mut empty_visited_left_side, + ); + Some(result) + } + Some(err) => Some(err), + None => None, + }) + } +} + +fn join_left_and_right_batch( + left_batch: &RecordBatch, + right_batch: &RecordBatch, + join_type: JoinType, + filter: Option<&JoinFilter>, + column_indices: &[ColumnIndex], + schema: &Schema, + visited_left_side: &mut BooleanBufferBuilder, +) -> Result<RecordBatch> { + let indices_result = (0..left_batch.num_rows()) + .map(|left_row_index| { + build_join_indices(left_row_index, right_batch, left_batch, filter) + }) + .collect::<Result<Vec<(UInt64Array, UInt32Array)>>>(); + + let mut left_indices_builder = UInt64Builder::new(); + let mut right_indices_builder = UInt32Builder::new(); + let left_right_indices = match indices_result { + Err(_) => Err(DataFusionError::Execution( + "Build left right indices error".to_string(), + )), + Ok(indices) => { + for (left_side, right_side) in indices { + left_indices_builder + .append_values(left_side.values(), &vec![true; left_side.len()]); + right_indices_builder + .append_values(right_side.values(), &vec![true; right_side.len()]); + } + Ok(( + left_indices_builder.finish(), + right_indices_builder.finish(), + )) + } + }; + match left_right_indices { + Ok((left_side, right_side)) => { + // set the left bitmap + // and only full join need the left bitmap + if join_type == JoinType::Full { + left_side.iter().flatten().for_each(|x| { + visited_left_side.set_bit(x as usize, true); + }); + } + // adjust the two side indices base on the join type + let (left_side, right_side) = adjust_indices_by_join_type( + left_side, + right_side, + left_batch.num_rows(), + right_batch.num_rows(), + join_type, + ); + + build_batch_from_indices( + schema, + left_batch, + right_batch, + left_side, + right_side, + column_indices, + ) + } + Err(e) => Err(e), + } +} + +fn adjust_indices_by_join_type( + left_indices: UInt64Array, + right_indices: UInt32Array, + count_left_batch: usize, + count_right_batch: usize, + join_type: JoinType, +) -> (UInt64Array, UInt32Array) { + match join_type { + JoinType::Inner => (left_indices, right_indices), + JoinType::Left => { + // matched + // unmatched left row will be produced in this batch + let left_unmatched_indices = + get_anti_u64_indices(count_left_batch, &left_indices); + // combine the matched and unmatched left result together + append_left_indices(left_indices, right_indices, left_unmatched_indices) + } + JoinType::LeftSemi => { + // need to remove the duplicated record in the left side + let left_indices = get_semi_u64_indices(count_left_batch, &left_indices); + // the right_indices will not be used later for the `left semi` join + (left_indices, right_indices) + } + JoinType::LeftAnti => { + // need to remove the duplicated record in the left side + // get the anti index for the left side + let left_indices = get_anti_u64_indices(count_left_batch, &left_indices); + // the right_indices will not be used later for the `left anti` join + (left_indices, right_indices) + } Review Comment: Left/LeftSemi/LeftAnti joins do not need the bitmap because the right table is single partition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org