alamb commented on code in PR #8020:
URL: https://github.com/apache/arrow-datafusion/pull/8020#discussion_r1383890655


##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -878,16 +986,35 @@ pub fn build_equal_condition_join_indices<T: 
JoinHashMapType>(
     // With this approach, the lexicographic order on both the probe side and 
the build side is preserved.
     let hash_map = build_hashmap.get_map();
     let next_chain = build_hashmap.get_list();
-    for (row, hash_value) in hash_values.iter().enumerate().rev() {
-        // Get the hash and find it in the build index
+
+    let mut output_tuples = 0_usize;
+
+    // Get starting point in case resuming current probe-batch
+    let (initial_probe, initial_build) = state.start_mathching_iteration();
+
+    'probe: for (row, hash_value) in 
hash_values.iter().enumerate().skip(initial_probe) {
+        let index = if state.partial_output() && row == initial_probe {
+            // using build index from state for the first row
+            // in case of partially skipped input
+            if initial_build == 0 {
+                continue;
+            }
+            Some(initial_build as u64)
+        } else if let Some((_, index)) =
+            hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash)
+        {
+            // otherwise -- checking build hashmap for precense of current 
hash_value

Review Comment:
   ```suggestion
               // otherwise -- checking build hashmap for presence of current 
hash_value
   ```



##########
datafusion/physical-plan/src/joins/utils.rs:
##########
@@ -920,71 +920,99 @@ pub(crate) fn append_right_indices(
     }
 }
 
-/// Get unmatched and deduplicated indices
+/// Get unmatched and deduplicated indices for specified range of indices
 pub(crate) fn get_anti_indices(

Review Comment:
   maybe could you update this comment with an explanation of what `range` 
represents? Specifically, is it a range of indices in input_indicies to process?



##########
datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt:
##########
@@ -72,11 +72,11 @@ SELECT t1.a, t1.b, t1.c, t2.a as a2
  ON t1.d = t2.d ORDER BY a2, t2.b
  LIMIT 5
 ----
-0 0 0 0
-0 0 2 0
-0 0 3 0
-0 0 6 0
-0 0 20 0
+1 3 95 0

Review Comment:
   I don't think the build side order is important -- maybe we should just 
update the test so it has a well defined deterministic output 🤔 



##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -747,6 +753,97 @@ where
     Ok(())
 }
 
+// State for storing left/right side indices used for partial batch output
+// & producing ranges for adjusting indices
+#[derive(Debug, Default)]
+pub(crate) struct HashJoinStreamState {

Review Comment:
   Maybe `HashJoinOutputState` would be a more precise name. I don't feel 
strongly



##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -747,6 +753,97 @@ where
     Ok(())
 }
 
+// State for storing left/right side indices used for partial batch output

Review Comment:
   Maybe as a follow on PR we can move this structure into its own module 
(perhaps something like `datafusion/physical-plan/src/joins/join_output.rs`)



##########
datafusion/physical-plan/src/joins/utils.rs:
##########
@@ -920,71 +920,99 @@ pub(crate) fn append_right_indices(
     }
 }
 
-/// Get unmatched and deduplicated indices
+/// Get unmatched and deduplicated indices for specified range of indices
 pub(crate) fn get_anti_indices(
-    row_count: usize,
+    range: Range<usize>,
     input_indices: &UInt32Array,
 ) -> UInt32Array {
-    let mut bitmap = BooleanBufferBuilder::new(row_count);
-    bitmap.append_n(row_count, false);
-    input_indices.iter().flatten().for_each(|v| {
-        bitmap.set_bit(v as usize, true);
-    });
+    let mut bitmap = BooleanBufferBuilder::new(range.len());
+    bitmap.append_n(range.len(), false);
+    input_indices
+        .iter()
+        .flatten()
+        .map(|v| v as usize)
+        .filter(|v| range.contains(v))
+        .for_each(|v| {
+            bitmap.set_bit(v - range.start, true);
+        });
+
+    let offset = range.start;
 
     // get the anti index
-    (0..row_count)
-        .filter_map(|idx| (!bitmap.get_bit(idx)).then_some(idx as u32))
+    (range)
+        .filter_map(|idx| (!bitmap.get_bit(idx - offset)).then_some(idx as 
u32))
         .collect::<UInt32Array>()
 }
 
 /// Get unmatched and deduplicated indices
 pub(crate) fn get_anti_u64_indices(

Review Comment:
   `get_anti_u64_indices`  is always called with a range of `0..` -- what is 
the reason for changing it to take a `Range<usize>`?



##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -747,6 +753,97 @@ where
     Ok(())
 }
 
+// State for storing left/right side indices used for partial batch output
+// & producing ranges for adjusting indices
+#[derive(Debug, Default)]
+pub(crate) struct HashJoinStreamState {
+    // total rows in current probe batch
+    probe_rows: usize,
+    // saved probe-build indices to resume matching from
+    last_matched_indices: Option<(usize, usize)>,
+    // current iteration has been updated
+    matched_indices_updated: bool,
+    // tracking last joined probe side index seen for further indices 
adjustment
+    last_joined_probe_index: Option<usize>,
+    // tracking last joined probe side index seen for further indices 
adjustment

Review Comment:
   This comment is same as above -- can you clarify what the difference is 
between "last" and "prev"?



##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -747,6 +753,97 @@ where
     Ok(())
 }
 
+// State for storing left/right side indices used for partial batch output
+// & producing ranges for adjusting indices
+#[derive(Debug, Default)]
+pub(crate) struct HashJoinStreamState {
+    // total rows in current probe batch
+    probe_rows: usize,
+    // saved probe-build indices to resume matching from
+    last_matched_indices: Option<(usize, usize)>,
+    // current iteration has been updated
+    matched_indices_updated: bool,
+    // tracking last joined probe side index seen for further indices 
adjustment
+    last_joined_probe_index: Option<usize>,
+    // tracking last joined probe side index seen for further indices 
adjustment
+    prev_joined_probe_index: Option<usize>,
+}
+
+impl HashJoinStreamState {

Review Comment:
   Another way to represent a state machine like this in Rust is via an enum. 
The upside is that it makes it more explicit what fields are used in what 
states and what transitions are allowed
   
   Something like this perhaps
   ```rust
   enum HashJoinOutputState {
     /// The hash table is still being built
     Hashing,
     /// The hash table has been built, and a batch of `probe_rows` has is 
being output
     /// but nothing has been output yet
     Begin {
       probe_rows: usize
     },
     /// Have output up to `last_matched_indices`
     Output {
       // saved probe-build indices to resume matching from
       last_matched_indices: Option<(usize, usize)>,
     } 
    /// ....
   }
   ```
   



##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1037,107 +1174,128 @@ impl HashJoinStream {
             }
         });
         let mut hashes_buffer = vec![];
-        self.right
-            .poll_next_unpin(cx)
-            .map(|maybe_batch| match maybe_batch {
-                // one right batch in the join loop
-                Some(Ok(batch)) => {
-                    self.join_metrics.input_batches.add(1);
-                    self.join_metrics.input_rows.add(batch.num_rows());
-                    let timer = self.join_metrics.join_time.timer();
-
-                    // get the matched two indices for the on condition
-                    let left_right_indices = 
build_equal_condition_join_indices(
-                        &left_data.0,
-                        &left_data.1,
-                        &batch,
-                        &self.on_left,
-                        &self.on_right,
-                        &self.random_state,
-                        self.null_equals_null,
-                        &mut hashes_buffer,
-                        self.filter.as_ref(),
-                        JoinSide::Left,
-                        None,
-                    );
 
-                    let result = match left_right_indices {
-                        Ok((left_side, right_side)) => {
-                            // set the left bitmap
-                            // and only left, full, left semi, left anti need 
the left bitmap
-                            if need_produce_result_in_final(self.join_type) {
-                                left_side.iter().flatten().for_each(|x| {
-                                    visited_left_side.set_bit(x as usize, 
true);
-                                });
-                            }
-
-                            // adjust the two side indices base on the join 
type
-                            let (left_side, right_side) = 
adjust_indices_by_join_type(
-                                left_side,
-                                right_side,
-                                batch.num_rows(),
-                                self.join_type,
-                            );
-
-                            let result = build_batch_from_indices(
-                                &self.schema,
-                                &left_data.1,
-                                &batch,
-                                &left_side,
-                                &right_side,
-                                &self.column_indices,
-                                JoinSide::Left,
-                            );
-                            self.join_metrics.output_batches.add(1);
-                            
self.join_metrics.output_rows.add(batch.num_rows());
-                            Some(result)
-                        }
-                        Err(err) => Some(exec_err!(
-                            "Fail to build join indices in HashJoinExec, 
error:{err}"
-                        )),
-                    };
-                    timer.done();
-                    result
+        // Fetch next probe batch
+        if self.probe_batch.is_none() {
+            match self.right.poll_next_unpin(cx) {

Review Comment:
   You can use `ready!` here to avoid having to match on `Poll::*` -- something 
like 
   
   ```rust
           if self.probe_batch.is_none() {
               match ready!(self.right.poll_next_unpin(cx)) {
                   Some(Ok(batch)) => {
                       self.state.set_probe_rows(batch.num_rows());
                       self.probe_batch = Some(batch);
                   }
                   None => {
                       self.probe_batch = None;
                   }
                   Some(err) => return Poll::Ready(Some(err)),
               }
           }
   ```



##########
datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt:
##########
@@ -72,11 +72,11 @@ SELECT t1.a, t1.b, t1.c, t2.a as a2
  ON t1.d = t2.d ORDER BY a2, t2.b
  LIMIT 5
 ----
-0 0 0 0
-0 0 2 0
-0 0 3 0
-0 0 6 0
-0 0 20 0
+1 3 95 0

Review Comment:
   I don't think the build side order is important -- maybe we should just 
update the test so it has a well defined deterministic output 🤔 



##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1165,13 +1323,30 @@ mod tests {
 
     use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, 
UInt64Builder};
     use arrow::datatypes::{DataType, Field, Schema};
-    use datafusion_common::{assert_batches_sorted_eq, assert_contains, 
ScalarValue};
+    use datafusion_common::{
+        assert_batches_eq, assert_batches_sorted_eq, assert_contains, 
ScalarValue,
+    };
     use datafusion_execution::config::SessionConfig;
     use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use datafusion_expr::Operator;
     use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
 
     use hashbrown::raw::RawTable;
+    use rstest::*;
+    use rstest_reuse::{self, *};
+
+    fn div_ceil(a: usize, b: usize) -> usize {
+        (a + b - 1) / b
+    }
+
+    #[template]
+    #[rstest]
+    fn batch_sizes(#[values(8192, 10, 5, 2, 1)] batch_size: usize) {}

Review Comment:
   This is a nice way to test the logic



##########
datafusion/physical-plan/src/joins/utils.rs:
##########
@@ -920,71 +920,99 @@ pub(crate) fn append_right_indices(
     }
 }
 
-/// Get unmatched and deduplicated indices
+/// Get unmatched and deduplicated indices for specified range of indices
 pub(crate) fn get_anti_indices(
-    row_count: usize,
+    range: Range<usize>,
     input_indices: &UInt32Array,
 ) -> UInt32Array {
-    let mut bitmap = BooleanBufferBuilder::new(row_count);
-    bitmap.append_n(row_count, false);
-    input_indices.iter().flatten().for_each(|v| {
-        bitmap.set_bit(v as usize, true);
-    });
+    let mut bitmap = BooleanBufferBuilder::new(range.len());
+    bitmap.append_n(range.len(), false);
+    input_indices
+        .iter()
+        .flatten()
+        .map(|v| v as usize)
+        .filter(|v| range.contains(v))
+        .for_each(|v| {
+            bitmap.set_bit(v - range.start, true);
+        });
+
+    let offset = range.start;
 
     // get the anti index
-    (0..row_count)
-        .filter_map(|idx| (!bitmap.get_bit(idx)).then_some(idx as u32))
+    (range)
+        .filter_map(|idx| (!bitmap.get_bit(idx - offset)).then_some(idx as 
u32))
         .collect::<UInt32Array>()
 }
 
 /// Get unmatched and deduplicated indices
 pub(crate) fn get_anti_u64_indices(
-    row_count: usize,
+    range: Range<usize>,
     input_indices: &UInt64Array,
 ) -> UInt64Array {
-    let mut bitmap = BooleanBufferBuilder::new(row_count);
-    bitmap.append_n(row_count, false);
-    input_indices.iter().flatten().for_each(|v| {
-        bitmap.set_bit(v as usize, true);
-    });
+    let mut bitmap = BooleanBufferBuilder::new(range.len());
+    bitmap.append_n(range.len(), false);
+    input_indices
+        .iter()
+        .flatten()
+        .map(|v| v as usize)
+        .filter(|v| range.contains(v))
+        .for_each(|v| {
+            bitmap.set_bit(v - range.start, true);
+        });
+
+    let offset = range.start;
 
     // get the anti index
-    (0..row_count)
-        .filter_map(|idx| (!bitmap.get_bit(idx)).then_some(idx as u64))
+    (range)
+        .filter_map(|idx| (!bitmap.get_bit(idx - offset)).then_some(idx as 
u64))
         .collect::<UInt64Array>()
 }
 
-/// Get matched and deduplicated indices
+/// Get matched and deduplicated indices for specified range of indices
 pub(crate) fn get_semi_indices(
-    row_count: usize,
+    range: Range<usize>,
     input_indices: &UInt32Array,
 ) -> UInt32Array {
-    let mut bitmap = BooleanBufferBuilder::new(row_count);
-    bitmap.append_n(row_count, false);
-    input_indices.iter().flatten().for_each(|v| {
-        bitmap.set_bit(v as usize, true);
-    });
+    let mut bitmap = BooleanBufferBuilder::new(range.len());
+    bitmap.append_n(range.len(), false);
+    input_indices
+        .iter()
+        .flatten()
+        .map(|v| v as usize)
+        .filter(|v| range.contains(v))
+        .for_each(|v| {
+            bitmap.set_bit(v - range.start, true);
+        });
+
+    let offset = range.start;
 
     // get the semi index
-    (0..row_count)
-        .filter_map(|idx| (bitmap.get_bit(idx)).then_some(idx as u32))
+    (range)
+        .filter_map(|idx| (bitmap.get_bit(idx - offset)).then_some(idx as u32))
         .collect::<UInt32Array>()
 }
 
 /// Get matched and deduplicated indices
 pub(crate) fn get_semi_u64_indices(

Review Comment:
   Likewise, `get_semi_u64_indices` never seems to be called with a range that 
doesn't start with zero



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to