metesynnada commented on code in PR #8020:
URL: https://github.com/apache/arrow-datafusion/pull/8020#discussion_r1455734159


##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -665,6 +668,8 @@ impl ExecutionPlan for HashJoinExec {
             reservation,
             state: HashJoinStreamState::WaitBuildSide,
             build_side: BuildSide::Initial(BuildSideInitialState { left_fut }),
+            batch_size,

Review Comment:
   I am not comfortable selecting the limit as batch size since it does not 
consider memory. However, I am unsure of how to pick a heuristic value.



##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1270,70 +1252,118 @@ impl HashJoinStream {
     fn process_probe_batch(
         &mut self,
     ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
-        let state = self.state.try_as_process_probe_batch()?;
+        let state = self.state.try_as_process_probe_batch_mut()?;
         let build_side = self.build_side.try_as_ready_mut()?;
 
         self.join_metrics.input_batches.add(1);
         self.join_metrics.input_rows.add(state.batch.num_rows());
         let timer = self.join_metrics.join_time.timer();
 
-        let mut hashes_buffer = vec![];
-        // get the matched two indices for the on condition
-        let left_right_indices = build_equal_condition_join_indices(
+        // get the matched by join keys indices
+        let (left_indices, right_indices, next_offset) = lookup_join_hashmap(
             build_side.left_data.hash_map(),
             build_side.left_data.batch(),
             &state.batch,
             &self.on_left,
             &self.on_right,
             &self.random_state,
             self.null_equals_null,
-            &mut hashes_buffer,
-            self.filter.as_ref(),
-            JoinSide::Left,
-            None,
-            true,
-        );
+            &mut self.hashes_buffer,
+            self.batch_size,
+            state.offset,
+        )?;
 
-        let result = match left_right_indices {
-            Ok((left_side, right_side)) => {
-                // set the left bitmap
-                // and only left, full, left semi, left anti need the left 
bitmap
-                if need_produce_result_in_final(self.join_type) {
-                    left_side.iter().flatten().for_each(|x| {
-                        build_side.visited_left_side.set_bit(x as usize, true);
-                    });
-                }
+        // apply join filter if exists
+        let (left_indices, right_indices) = if let Some(filter) = &self.filter 
{
+            apply_join_filter_to_indices(
+                build_side.left_data.batch(),
+                &state.batch,
+                left_indices,
+                right_indices,
+                filter,
+                JoinSide::Left,
+            )?
+        } else {
+            (left_indices, right_indices)
+        };
 
-                // adjust the two side indices base on the join type
-                let (left_side, right_side) = adjust_indices_by_join_type(
-                    left_side,
-                    right_side,
-                    state.batch.num_rows(),
-                    self.join_type,
-                );
+        // mark joined left-side indices as visited, if required by join type
+        if need_produce_result_in_final(self.join_type) {
+            left_indices.iter().flatten().for_each(|x| {
+                build_side.visited_left_side.set_bit(x as usize, true);
+            });
+        }
 
-                let result = build_batch_from_indices(
-                    &self.schema,
-                    build_side.left_data.batch(),
-                    &state.batch,
-                    &left_side,
-                    &right_side,
-                    &self.column_indices,
-                    JoinSide::Left,
-                );
-                self.join_metrics.output_batches.add(1);
-                self.join_metrics.output_rows.add(state.batch.num_rows());
-                result
-            }
-            Err(err) => {
-                exec_err!("Fail to build join indices in HashJoinExec, 
error:{err}")
-            }
+        // check if probe batch scanned based on `next_offset` returned from 
lookup function
+        let probe_batch_scanned = next_offset.is_none()
+            || next_offset.is_some_and(|(probe_idx, build_idx)| {
+                probe_idx + 1 >= state.batch.num_rows()
+                    && build_idx.is_some_and(|v| v == 0)
+            });
+
+        // The goals of index alignment for different join types are:
+        //
+        // 1) Right & FullJoin -- to append all missing probe-side indices 
between
+        //    previous (excluding) and current joined indices.
+        // 2) SemiJoin -- deduplicate probe indices in range between previous
+        //    (excluding) and current joined indices.
+        // 3) AntiJoin -- return only missing indices in range between
+        //    previous and current joined indices.
+        //    Inclusion/exclusion of the indices themselves don't matter
+        //
+        // As a summary -- alignment range can be produced based only on
+        // joined (matched with filters applied) probe side indices, excluding 
starting one
+        // (left from previous iteration).
+
+        // if any rows have been joined -- get last joined probe-side (right) 
row
+        // it's important that index counts as "joined" after hash collisions 
checks
+        // and join filters applied.
+        let last_joined_right_idx = match right_indices.len() {
+            0 => None,
+            n => Some(right_indices.value(n - 1) as usize),
+        };
+
+        // Calculate range and perform alignment.
+        // In case probe batch has been processed -- align all remaining rows.
+        let index_alignment_range_start = state.joined_probe_idx.map_or(0, |v| 
v + 1);
+        let index_alignment_range_end = if probe_batch_scanned {
+            state.batch.num_rows()
+        } else {
+            last_joined_right_idx.map_or(0, |v| v + 1)
         };
+
+        let (left_indices, right_indices) = adjust_indices_by_join_type(
+            left_indices,
+            right_indices,
+            index_alignment_range_start..index_alignment_range_end,
+            self.join_type,
+        );
+
+        let result = build_batch_from_indices(
+            &self.schema,
+            build_side.left_data.batch(),
+            &state.batch,
+            &left_indices,
+            &right_indices,
+            &self.column_indices,
+            JoinSide::Left,
+        )?;
+
+        self.join_metrics.output_batches.add(1);
+        self.join_metrics.output_rows.add(result.num_rows());
         timer.done();
 
-        self.state = HashJoinStreamState::FetchProbeBatch;
+        if probe_batch_scanned {

Review Comment:
   For state machine handling, please take a look at `EagerJoinStream` trait. 
It encapsulates the whole build and probe data pulling and state management, 
which means state handling and join calculations (inside the implementors of 
the trait) are visibly separated. It makes the reading quite easy. You can 
follow a layered approach like `EagerJoinStream` and SHJ. This means inside the 
method you are calculating joins, do not alter the state.



##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1023,11 +1048,9 @@ pub fn build_equal_condition_join_indices<T: 
JoinHashMapType>(
     random_state: &RandomState,
     null_equals_null: bool,
     hashes_buffer: &mut Vec<u64>,
-    filter: Option<&JoinFilter>,
-    build_side: JoinSide,
-    deleted_offset: Option<usize>,
-    fifo_hashmap: bool,
-) -> Result<(UInt64Array, UInt32Array)> {
+    limit: usize,
+    offset: JoinHashMapOffset,
+) -> Result<(UInt64Array, UInt32Array, Option<JoinHashMapOffset>)> {
     let keys_values = probe_on

Review Comment:
   I believe the probe hashes are being recalculated unnecessarily if the probe 
batch is not completed yet. To improve efficiency, an offset can be used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to