jackwener commented on code in PR #4377:
URL: https://github.com/apache/arrow-datafusion/pull/4377#discussion_r1032930615
##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -1441,44 +1181,147 @@ fn equal_rows(
err.unwrap_or(Ok(res))
}
-// Produces a batch for left-side rows that have/have not been matched during
the whole join
-fn produce_from_matched(
- visited_left_side: &BooleanBufferBuilder,
- schema: &SchemaRef,
- column_indices: &[ColumnIndex],
- left_data: &JoinLeftData,
- unmatched: bool,
-) -> ArrowResult<RecordBatch> {
- let indices = if unmatched {
- UInt64Array::from_iter_values(
- (0..visited_left_side.len())
- .filter_map(|v| (!visited_left_side.get_bit(v)).then_some(v as
u64)),
- )
+// The input is the matched indices for left and right.
+// Adjust the indices according to the join type
+fn adjust_indices_by_join_type(
+ left_indices: UInt64Array,
+ right_indices: UInt32Array,
+ count_right_batch: usize,
+ join_type: JoinType,
+) -> (UInt64Array, UInt32Array) {
+ match join_type {
+ JoinType::Inner => {
+ // matched
+ (left_indices, right_indices)
+ }
+ JoinType::Left => {
+ // matched
+ (left_indices, right_indices)
+ // unmatched left row will be produced in the end of loop, and it
has been set in the left visited bitmap
+ }
+ JoinType::Right | JoinType::Full => {
+ // matched
+ // unmatched right row will be produced in this batch
+ let right_null_indices = get_anti_indices(count_right_batch,
&right_indices);
+ // combine the matched and unmatched right result together
+ append_right_indices(left_indices, right_indices,
right_null_indices)
+ }
+ JoinType::RightSemi => {
+ // need to remove the duplicated record in the right side
+ let right_indices = get_semi_indices(count_right_batch,
&right_indices);
+ // the left_indices will not be used later for the `right semi`
join
+ (left_indices, right_indices)
+ }
+ JoinType::RightAnti => {
+ // need to remove the duplicated record in the right side
+ // get the anti index for the right side
+ let right_indices = get_anti_indices(count_right_batch,
&right_indices);
+ // the left_indices will not be used later for the `right anti`
join
+ (left_indices, right_indices)
+ }
+ JoinType::LeftSemi | JoinType::LeftAnti => {
+ // matched or unmatched left row will be produced in the end of
loop
+ (
+ UInt64Array::from_iter_values(vec![]),
+ UInt32Array::from_iter_values(vec![]),
Review Comment:
🤔 I don't understand why these code
##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -688,95 +683,77 @@ fn build_batch_from_indices(
};
columns.push(array);
}
- RecordBatch::try_new(Arc::new(schema.clone()), columns).map(|x| (x,
left_indices))
+ RecordBatch::try_new(Arc::new(schema.clone()), columns)
}
+// Get left and right indices which is satisfies the on condition in the Join
#[allow(clippy::too_many_arguments)]
-fn build_batch(
+fn build_join_indices(
batch: &RecordBatch,
left_data: &JoinLeftData,
on_left: &[Column],
on_right: &[Column],
filter: &Option<JoinFilter>,
- join_type: JoinType,
- schema: &Schema,
- column_indices: &[ColumnIndex],
random_state: &RandomState,
null_equals_null: &bool,
-) -> ArrowResult<(RecordBatch, UInt64Array)> {
- let (left_indices, right_indices) = build_join_indexes(
+) -> Result<(UInt64Array, UInt32Array)> {
+ // Get the indices which is satisfies the equal join condition, like
`left.a1 = right.a2`
+ let (left_indices, right_indices) = build_equal_condition_join_indices(
left_data,
batch,
- join_type,
on_left,
on_right,
random_state,
null_equals_null,
- )
- .unwrap();
-
- let (left_filtered_indices, right_filtered_indices) = if let Some(filter)
= filter {
- apply_join_filter(
+ )?;
+ if let Some(filter) = filter {
+ // Filter the indices which is satisfies the non-equal join condition,
like `left.b1 = 10`
+ apply_join_filter_to_indices(
&left_data.1,
batch,
- join_type,
left_indices,
right_indices,
filter,
)
- .unwrap()
} else {
- (left_indices, right_indices)
- };
-
- if matches!(join_type, JoinType::LeftSemi | JoinType::LeftAnti) {
- return Ok((
- RecordBatch::new_empty(Arc::new(schema.clone())),
- left_filtered_indices,
- ));
+ Ok((left_indices, right_indices))
}
-
- build_batch_from_indices(
- schema,
- &left_data.1,
- batch,
- left_filtered_indices,
- right_filtered_indices,
- column_indices,
- )
}
-/// returns a vector with (index from left, index from right).
-/// The size of this vector corresponds to the total size of a joined batch
-// For a join on column A:
-// left right
-// batch 1
-// A B A D
-// ---------------
-// 1 a 3 6
-// 2 b 1 2
-// 3 c 2 4
-// batch 2
-// A B A D
-// ---------------
-// 1 a 5 10
-// 2 b 2 2
-// 4 d 1 1
-// indices (batch, batch_row)
-// left right
-// (0, 2) (0, 0)
-// (0, 0) (0, 1)
-// (0, 1) (0, 2)
-// (1, 0) (0, 1)
-// (1, 1) (0, 2)
-// (0, 1) (1, 1)
-// (0, 0) (1, 2)
-// (1, 1) (1, 1)
-// (1, 0) (1, 2)
-fn build_join_indexes(
+// return the index of equal condition join result: left_indices and
right_indices
+// On LEFT.b1 = RIGHT.b2
+// LEFT Table:
+// a1 b1 c1
+// 1 1 10
+// 3 3 30
+// 5 5 50
+// 7 7 70
+// 9 8 90
+// 11 8 110
+// 13 10 130
+// RIGHT Table:
+// a2 b2 c2
+// 2 2 20
+// 4 4 40
+// 6 6 60
+// 8 8 80
+// 10 10 100
+// 12 10 120
+// The result is
+// "+----+----+-----+----+----+-----+",
+// "| a1 | b1 | c1 | a2 | b2 | c2 |",
+// "+----+----+-----+----+----+-----+",
+// "| 11 | 8 | 110 | 8 | 8 | 80 |",
+// "| 13 | 10 | 130 | 10 | 10 | 100 |",
+// "| 13 | 10 | 130 | 12 | 10 | 120 |",
+// "| 9 | 8 | 90 | 8 | 8 | 80 |",
+// "+----+----+-----+----+----+-----+"
+// And the result of left and right indices
+// left indices: 5,6,6,4
+// right indices: 3,4,5,3
Review Comment:
👍
##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -688,95 +683,77 @@ fn build_batch_from_indices(
};
columns.push(array);
}
- RecordBatch::try_new(Arc::new(schema.clone()), columns).map(|x| (x,
left_indices))
+ RecordBatch::try_new(Arc::new(schema.clone()), columns)
}
+// Get left and right indices which is satisfies the on condition in the Join
Review Comment:
It is easy to forget the `filter` and `equal-conditon` both are
`on-condition` and they are combined by `And` implicitly.
```suggestion
// Get left and right indices which is satisfies the on condition (include
equal_conditon and filter_in_join) in the Join
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]