jackwener commented on code in PR #4377:
URL: https://github.com/apache/arrow-datafusion/pull/4377#discussion_r1032583319
##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -1482,105 +1325,115 @@ impl HashJoinStream {
let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
let num_rows = left_data.1.num_rows();
- match self.join_type {
- JoinType::Left
- | JoinType::Full
- | JoinType::LeftSemi
- | JoinType::LeftAnti => {
- let mut buffer = BooleanBufferBuilder::new(num_rows);
-
- buffer.append_n(num_rows, false);
-
- buffer
- }
- JoinType::Inner
- | JoinType::Right
- | JoinType::RightSemi
- | JoinType::RightAnti => BooleanBufferBuilder::new(0),
+ if need_produce_result_in_final(self.join_type) {
+ // these join type need the bitmap to identify which row has
be matched or unmatched.
+ // For the `left semi` join, need to use the bitmap to produce
the matched row in the left side
+ // For the `left` join, need to use the bitmap to produce the
unmatched row in the left side with null
+ // For the `left anti` join, need to use the bitmap to produce
the unmatched row in the left side
+ // For the `full` join, need to use the bitmap to produce the
unmatched row in the left side with null
+ let mut buffer = BooleanBufferBuilder::new(num_rows);
+ buffer.append_n(num_rows, false);
+ buffer
+ } else {
+ BooleanBufferBuilder::new(0)
}
});
self.right
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
+ // one right batch in the join loop
Some(Ok(batch)) => {
+ self.join_metrics.input_batches.add(1);
+ self.join_metrics.input_rows.add(batch.num_rows());
let timer = self.join_metrics.join_time.timer();
- let result = build_batch(
+
+ // get the matched two indices for the on condition
+ let left_right_indices = build_join_indices(
&batch,
left_data,
&self.on_left,
&self.on_right,
&self.filter,
- self.join_type,
- &self.schema,
- &self.column_indices,
&self.random_state,
&self.null_equals_null,
);
- self.join_metrics.input_batches.add(1);
- self.join_metrics.input_rows.add(batch.num_rows());
- if let Ok((ref batch, ref left_side)) = result {
- timer.done();
- self.join_metrics.output_batches.add(1);
- self.join_metrics.output_rows.add(batch.num_rows());
-
- match self.join_type {
- JoinType::Left
- | JoinType::Full
- | JoinType::LeftSemi
- | JoinType::LeftAnti => {
+
+ match left_right_indices {
+ Ok((left_side, right_side)) => {
+ timer.done();
+
+ // set the left bitmap
+ // and only left, full, left semi, left anti need
the left bitmap
+ if need_produce_result_in_final(self.join_type) {
left_side.iter().flatten().for_each(|x| {
visited_left_side.set_bit(x as usize,
true);
});
}
- JoinType::Inner
- | JoinType::Right
- | JoinType::RightSemi
- | JoinType::RightAnti => {}
+
+ // adjust the two side indices base on the join
type
+ let (left_side, right_side) =
adjust_indices_by_join_type(
+ left_side,
+ right_side,
+ batch.num_rows(),
+ self.join_type,
+ );
+
+ let result = build_batch_from_indices(
+ &self.schema,
+ &left_data.1,
+ &batch,
+ left_side,
+ right_side,
+ &self.column_indices,
+ );
+ self.join_metrics.output_batches.add(1);
+
self.join_metrics.output_rows.add(batch.num_rows());
+ Some(result)
+ }
+ Err(_) => {
+ // TODO why the type of result stream is
`Result<T, ArrowError>`, and not the `DataFusionError`
Review Comment:
Is it same with #4172?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]