korowa commented on code in PR #8020:
URL: https://github.com/apache/arrow-datafusion/pull/8020#discussion_r1452688577


##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -908,23 +912,36 @@ enum HashJoinStreamState {
     Completed,
 }
 
-/// Container for HashJoinStreamState::ProcessProbeBatch related data
-struct ProcessProbeBatchState {
-    /// Current probe-side batch
-    batch: RecordBatch,
-}
-
 impl HashJoinStreamState {
     /// Tries to extract ProcessProbeBatchState from HashJoinStreamState enum.
     /// Returns an error if state is not ProcessProbeBatchState.
-    fn try_as_process_probe_batch(&self) -> Result<&ProcessProbeBatchState> {
+    fn try_as_process_probe_batch_mut(&mut self) -> Result<&mut 
ProcessProbeBatchState> {
         match self {
             HashJoinStreamState::ProcessProbeBatch(state) => Ok(state),
             _ => internal_err!("Expected hash join stream in ProcessProbeBatch 
state"),
         }
     }
 }
 
+/// Container for HashJoinStreamState::ProcessProbeBatch related data
+struct ProcessProbeBatchState {
+    /// Current probe-side batch
+    batch: RecordBatch,
+    /// Matching offset

Review Comment:
   I've updated comment -- now it indicates that this attribute is used as a 
starting offset for lookups against join hashmap



##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1270,70 +1247,117 @@ impl HashJoinStream {
     fn process_probe_batch(
         &mut self,
     ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
-        let state = self.state.try_as_process_probe_batch()?;
+        let state = self.state.try_as_process_probe_batch_mut()?;
         let build_side = self.build_side.try_as_ready_mut()?;
 
         self.join_metrics.input_batches.add(1);
         self.join_metrics.input_rows.add(state.batch.num_rows());
         let timer = self.join_metrics.join_time.timer();
 
-        let mut hashes_buffer = vec![];
-        // get the matched two indices for the on condition
-        let left_right_indices = build_equal_condition_join_indices(
+        // get the matched by join keys indices
+        let (left_indices, right_indices, next_offset) = lookup_join_hashmap(
             build_side.left_data.hash_map(),
             build_side.left_data.batch(),
             &state.batch,
             &self.on_left,
             &self.on_right,
             &self.random_state,
             self.null_equals_null,
-            &mut hashes_buffer,
-            self.filter.as_ref(),
-            JoinSide::Left,
-            None,
-            true,
-        );
+            self.batch_size,
+            state.offset,
+        )?;
 
-        let result = match left_right_indices {
-            Ok((left_side, right_side)) => {
-                // set the left bitmap
-                // and only left, full, left semi, left anti need the left 
bitmap
-                if need_produce_result_in_final(self.join_type) {
-                    left_side.iter().flatten().for_each(|x| {
-                        build_side.visited_left_side.set_bit(x as usize, true);
-                    });
-                }
+        // apply join filter if exists
+        let (left_indices, right_indices) = if let Some(filter) = &self.filter 
{
+            apply_join_filter_to_indices(
+                build_side.left_data.batch(),
+                &state.batch,
+                left_indices,
+                right_indices,
+                filter,
+                JoinSide::Left,
+            )?
+        } else {
+            (left_indices, right_indices)
+        };
 
-                // adjust the two side indices base on the join type
-                let (left_side, right_side) = adjust_indices_by_join_type(
-                    left_side,
-                    right_side,
-                    state.batch.num_rows(),
-                    self.join_type,
-                );
+        // mark joined left-side indices as visited, if required by join type
+        if need_produce_result_in_final(self.join_type) {
+            left_indices.iter().flatten().for_each(|x| {
+                build_side.visited_left_side.set_bit(x as usize, true);
+            });
+        }
 
-                let result = build_batch_from_indices(
-                    &self.schema,
-                    build_side.left_data.batch(),
-                    &state.batch,
-                    &left_side,
-                    &right_side,
-                    &self.column_indices,
-                    JoinSide::Left,
-                );
-                self.join_metrics.output_batches.add(1);
-                self.join_metrics.output_rows.add(state.batch.num_rows());
-                result
-            }
-            Err(err) => {
-                exec_err!("Fail to build join indices in HashJoinExec, 
error:{err}")
-            }
+        // check if probe batch scanned based on `next_offset` returned from 
lookup function
+        let probe_batch_scanned = next_offset.is_none()
+            || next_offset.is_some_and(|(probe_idx, build_idx)| {
+                probe_idx + 1 >= state.batch.num_rows()
+                    && build_idx.is_some_and(|v| v == 0)
+            });
+
+        // The goals of index alignment for different join types are:
+        //
+        // 1) Right & FullJoin -- to append all missing probe-side indices 
between
+        //    previous (excluding) and current joined indices.
+        // 2) SemiJoin -- deduplicate probe indices in range between previous
+        //    (excluding) and current joined indices.
+        // 3) AntiJoin -- return only missing indices in range between
+        //    previous and current joined indices.
+        //    Inclusion/exclusion of the indices themselves don't matter
+        //
+        // As a summary -- alignment range can be produced based only on
+        // joined (matched with filters applied) probe side indices, excluding 
starting one
+        // (left from previous iteration).
+
+        // if any rows have been joined -- get last joined probe-side (right) 
row
+        // it's important that index counts as "joined" after hash collisions 
checks
+        // and join filters applied.
+        let last_joined_right_idx = match right_indices.len() {
+            0 => None,
+            n => Some(right_indices.value(n - 1) as usize),
+        };
+
+        // Calculate range and perform alignment.
+        // In case probe batch has been processed -- align all remaining rows.
+        let index_alignment_range_start = state.joined_probe_idx.map_or(0, |v| 
v + 1);
+        let index_alignment_range_end = if probe_batch_scanned {
+            state.batch.num_rows()
+        } else {
+            last_joined_right_idx.map_or(0, |v| v + 1)
         };
+
+        let (left_indices, right_indices) = adjust_indices_by_join_type(
+            left_indices,
+            right_indices,
+            index_alignment_range_start..index_alignment_range_end,
+            self.join_type,
+        );
+
+        let result = build_batch_from_indices(
+            &self.schema,
+            build_side.left_data.batch(),
+            &state.batch,
+            &left_indices,
+            &right_indices,
+            &self.column_indices,
+            JoinSide::Left,
+        )?;
+
+        self.join_metrics.output_batches.add(1);
+        self.join_metrics.output_rows.add(state.batch.num_rows());

Review Comment:
   It definitely should, thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to