jackwener commented on code in PR #4377:
URL: https://github.com/apache/arrow-datafusion/pull/4377#discussion_r1032930615


##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -1441,44 +1181,147 @@ fn equal_rows(
     err.unwrap_or(Ok(res))
 }
 
-// Produces a batch for left-side rows that have/have not been matched during 
the whole join
-fn produce_from_matched(
-    visited_left_side: &BooleanBufferBuilder,
-    schema: &SchemaRef,
-    column_indices: &[ColumnIndex],
-    left_data: &JoinLeftData,
-    unmatched: bool,
-) -> ArrowResult<RecordBatch> {
-    let indices = if unmatched {
-        UInt64Array::from_iter_values(
-            (0..visited_left_side.len())
-                .filter_map(|v| (!visited_left_side.get_bit(v)).then_some(v as 
u64)),
-        )
+// The input is the matched indices for left and right.
+// Adjust the indices according to the join type
+fn adjust_indices_by_join_type(
+    left_indices: UInt64Array,
+    right_indices: UInt32Array,
+    count_right_batch: usize,
+    join_type: JoinType,
+) -> (UInt64Array, UInt32Array) {
+    match join_type {
+        JoinType::Inner => {
+            // matched
+            (left_indices, right_indices)
+        }
+        JoinType::Left => {
+            // matched
+            (left_indices, right_indices)
+            // unmatched left row will be produced in the end of loop, and it 
has been set in the left visited bitmap
+        }
+        JoinType::Right | JoinType::Full => {
+            // matched
+            // unmatched right row will be produced in this batch
+            let right_null_indices = get_anti_indices(count_right_batch, 
&right_indices);
+            // combine the matched and unmatched right result together
+            append_right_indices(left_indices, right_indices, 
right_null_indices)
+        }
+        JoinType::RightSemi => {
+            // need to remove the duplicated record in the right side
+            let right_indices = get_semi_indices(count_right_batch, 
&right_indices);
+            // the left_indices will not be used later for the `right semi` 
join
+            (left_indices, right_indices)
+        }
+        JoinType::RightAnti => {
+            // need to remove the duplicated record in the right side
+            // get the anti index for the right side
+            let right_indices = get_anti_indices(count_right_batch, 
&right_indices);
+            // the left_indices will not be used later for the `right anti` 
join
+            (left_indices, right_indices)
+        }
+        JoinType::LeftSemi | JoinType::LeftAnti => {
+            // matched or unmatched left row will be produced in the end of 
loop
+            (
+                UInt64Array::from_iter_values(vec![]),
+                UInt32Array::from_iter_values(vec![]),

Review Comment:
   🤔 I don't understand why these code



##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -688,95 +683,77 @@ fn build_batch_from_indices(
         };
         columns.push(array);
     }
-    RecordBatch::try_new(Arc::new(schema.clone()), columns).map(|x| (x, 
left_indices))
+    RecordBatch::try_new(Arc::new(schema.clone()), columns)
 }
 
+// Get left and right indices which is satisfies the on condition in the Join
 #[allow(clippy::too_many_arguments)]
-fn build_batch(
+fn build_join_indices(
     batch: &RecordBatch,
     left_data: &JoinLeftData,
     on_left: &[Column],
     on_right: &[Column],
     filter: &Option<JoinFilter>,
-    join_type: JoinType,
-    schema: &Schema,
-    column_indices: &[ColumnIndex],
     random_state: &RandomState,
     null_equals_null: &bool,
-) -> ArrowResult<(RecordBatch, UInt64Array)> {
-    let (left_indices, right_indices) = build_join_indexes(
+) -> Result<(UInt64Array, UInt32Array)> {
+    // Get the indices which is satisfies the equal join condition, like 
`left.a1 = right.a2`
+    let (left_indices, right_indices) = build_equal_condition_join_indices(
         left_data,
         batch,
-        join_type,
         on_left,
         on_right,
         random_state,
         null_equals_null,
-    )
-    .unwrap();
-
-    let (left_filtered_indices, right_filtered_indices) = if let Some(filter) 
= filter {
-        apply_join_filter(
+    )?;
+    if let Some(filter) = filter {
+        // Filter the indices which is satisfies the non-equal join condition, 
like `left.b1 = 10`
+        apply_join_filter_to_indices(
             &left_data.1,
             batch,
-            join_type,
             left_indices,
             right_indices,
             filter,
         )
-        .unwrap()
     } else {
-        (left_indices, right_indices)
-    };
-
-    if matches!(join_type, JoinType::LeftSemi | JoinType::LeftAnti) {
-        return Ok((
-            RecordBatch::new_empty(Arc::new(schema.clone())),
-            left_filtered_indices,
-        ));
+        Ok((left_indices, right_indices))
     }
-
-    build_batch_from_indices(
-        schema,
-        &left_data.1,
-        batch,
-        left_filtered_indices,
-        right_filtered_indices,
-        column_indices,
-    )
 }
 
-/// returns a vector with (index from left, index from right).
-/// The size of this vector corresponds to the total size of a joined batch
-// For a join on column A:
-// left       right
-//     batch 1
-// A B         A D
-// ---------------
-// 1 a         3 6
-// 2 b         1 2
-// 3 c         2 4
-//     batch 2
-// A B         A D
-// ---------------
-// 1 a         5 10
-// 2 b         2 2
-// 4 d         1 1
-// indices (batch, batch_row)
-// left       right
-// (0, 2)     (0, 0)
-// (0, 0)     (0, 1)
-// (0, 1)     (0, 2)
-// (1, 0)     (0, 1)
-// (1, 1)     (0, 2)
-// (0, 1)     (1, 1)
-// (0, 0)     (1, 2)
-// (1, 1)     (1, 1)
-// (1, 0)     (1, 2)
-fn build_join_indexes(
+// return the index of equal condition join result: left_indices and 
right_indices
+// On LEFT.b1 = RIGHT.b2
+// LEFT Table:
+//  a1  b1  c1
+//  1   1   10
+//  3   3   30
+//  5   5   50
+//  7   7   70
+//  9   8   90
+//  11  8   110
+// 13   10  130
+// RIGHT Table:
+//  a2   b2  c2
+//  2    2   20
+//  4    4   40
+//  6    6   60
+//  8    8   80
+// 10   10  100
+// 12   10  120
+// The result is
+// "+----+----+-----+----+----+-----+",
+// "| a1 | b1 | c1  | a2 | b2 | c2  |",
+// "+----+----+-----+----+----+-----+",
+// "| 11 | 8  | 110 | 8  | 8  | 80  |",
+// "| 13 | 10 | 130 | 10 | 10 | 100 |",
+// "| 13 | 10 | 130 | 12 | 10 | 120 |",
+// "| 9  | 8  | 90  | 8  | 8  | 80  |",
+// "+----+----+-----+----+----+-----+"
+// And the result of left and right indices
+// left indices:  5,6,6,4
+// right indices: 3,4,5,3

Review Comment:
   👍



##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -688,95 +683,77 @@ fn build_batch_from_indices(
         };
         columns.push(array);
     }
-    RecordBatch::try_new(Arc::new(schema.clone()), columns).map(|x| (x, 
left_indices))
+    RecordBatch::try_new(Arc::new(schema.clone()), columns)
 }
 
+// Get left and right indices which is satisfies the on condition in the Join

Review Comment:
   It is easy to forget the `filter` and `equal-conditon` both are 
`on-condition` and they are combined by `And` implicitly.
   ```suggestion
   // Get left and right indices which is satisfies the on condition (include 
equal_conditon and filter_in_join) in the Join
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to