alamb commented on code in PR #8020:
URL: https://github.com/apache/arrow-datafusion/pull/8020#discussion_r1383890655
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -878,16 +986,35 @@ pub fn build_equal_condition_join_indices<T:
JoinHashMapType>(
// With this approach, the lexicographic order on both the probe side and
the build side is preserved.
let hash_map = build_hashmap.get_map();
let next_chain = build_hashmap.get_list();
- for (row, hash_value) in hash_values.iter().enumerate().rev() {
- // Get the hash and find it in the build index
+
+ let mut output_tuples = 0_usize;
+
+ // Get starting point in case resuming current probe-batch
+ let (initial_probe, initial_build) = state.start_mathching_iteration();
+
+ 'probe: for (row, hash_value) in
hash_values.iter().enumerate().skip(initial_probe) {
+ let index = if state.partial_output() && row == initial_probe {
+ // using build index from state for the first row
+ // in case of partially skipped input
+ if initial_build == 0 {
+ continue;
+ }
+ Some(initial_build as u64)
+ } else if let Some((_, index)) =
+ hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash)
+ {
+ // otherwise -- checking build hashmap for precense of current
hash_value
Review Comment:
```suggestion
// otherwise -- checking build hashmap for presence of current
hash_value
```
##########
datafusion/physical-plan/src/joins/utils.rs:
##########
@@ -920,71 +920,99 @@ pub(crate) fn append_right_indices(
}
}
-/// Get unmatched and deduplicated indices
+/// Get unmatched and deduplicated indices for specified range of indices
pub(crate) fn get_anti_indices(
Review Comment:
maybe could you update this comment with an explanation of what `range`
represents? Specifically, is it a range of indices in input_indicies to process?
##########
datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt:
##########
@@ -72,11 +72,11 @@ SELECT t1.a, t1.b, t1.c, t2.a as a2
ON t1.d = t2.d ORDER BY a2, t2.b
LIMIT 5
----
-0 0 0 0
-0 0 2 0
-0 0 3 0
-0 0 6 0
-0 0 20 0
+1 3 95 0
Review Comment:
I don't think the build side order is important -- maybe we should just
update the test so it has a well defined deterministic output 🤔
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -747,6 +753,97 @@ where
Ok(())
}
+// State for storing left/right side indices used for partial batch output
+// & producing ranges for adjusting indices
+#[derive(Debug, Default)]
+pub(crate) struct HashJoinStreamState {
Review Comment:
Maybe `HashJoinOutputState` would be a more precise name. I don't feel
strongly
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -747,6 +753,97 @@ where
Ok(())
}
+// State for storing left/right side indices used for partial batch output
Review Comment:
Maybe as a follow on PR we can move this structure into its own module
(perhaps something like `datafusion/physical-plan/src/joins/join_output.rs`)
##########
datafusion/physical-plan/src/joins/utils.rs:
##########
@@ -920,71 +920,99 @@ pub(crate) fn append_right_indices(
}
}
-/// Get unmatched and deduplicated indices
+/// Get unmatched and deduplicated indices for specified range of indices
pub(crate) fn get_anti_indices(
- row_count: usize,
+ range: Range<usize>,
input_indices: &UInt32Array,
) -> UInt32Array {
- let mut bitmap = BooleanBufferBuilder::new(row_count);
- bitmap.append_n(row_count, false);
- input_indices.iter().flatten().for_each(|v| {
- bitmap.set_bit(v as usize, true);
- });
+ let mut bitmap = BooleanBufferBuilder::new(range.len());
+ bitmap.append_n(range.len(), false);
+ input_indices
+ .iter()
+ .flatten()
+ .map(|v| v as usize)
+ .filter(|v| range.contains(v))
+ .for_each(|v| {
+ bitmap.set_bit(v - range.start, true);
+ });
+
+ let offset = range.start;
// get the anti index
- (0..row_count)
- .filter_map(|idx| (!bitmap.get_bit(idx)).then_some(idx as u32))
+ (range)
+ .filter_map(|idx| (!bitmap.get_bit(idx - offset)).then_some(idx as
u32))
.collect::<UInt32Array>()
}
/// Get unmatched and deduplicated indices
pub(crate) fn get_anti_u64_indices(
Review Comment:
`get_anti_u64_indices` is always called with a range of `0..` -- what is
the reason for changing it to take a `Range<usize>`?
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -747,6 +753,97 @@ where
Ok(())
}
+// State for storing left/right side indices used for partial batch output
+// & producing ranges for adjusting indices
+#[derive(Debug, Default)]
+pub(crate) struct HashJoinStreamState {
+ // total rows in current probe batch
+ probe_rows: usize,
+ // saved probe-build indices to resume matching from
+ last_matched_indices: Option<(usize, usize)>,
+ // current iteration has been updated
+ matched_indices_updated: bool,
+ // tracking last joined probe side index seen for further indices
adjustment
+ last_joined_probe_index: Option<usize>,
+ // tracking last joined probe side index seen for further indices
adjustment
Review Comment:
This comment is same as above -- can you clarify what the difference is
between "last" and "prev"?
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -747,6 +753,97 @@ where
Ok(())
}
+// State for storing left/right side indices used for partial batch output
+// & producing ranges for adjusting indices
+#[derive(Debug, Default)]
+pub(crate) struct HashJoinStreamState {
+ // total rows in current probe batch
+ probe_rows: usize,
+ // saved probe-build indices to resume matching from
+ last_matched_indices: Option<(usize, usize)>,
+ // current iteration has been updated
+ matched_indices_updated: bool,
+ // tracking last joined probe side index seen for further indices
adjustment
+ last_joined_probe_index: Option<usize>,
+ // tracking last joined probe side index seen for further indices
adjustment
+ prev_joined_probe_index: Option<usize>,
+}
+
+impl HashJoinStreamState {
Review Comment:
Another way to represent a state machine like this in Rust is via an enum.
The upside is that it makes it more explicit what fields are used in what
states and what transitions are allowed
Something like this perhaps
```rust
enum HashJoinOutputState {
/// The hash table is still being built
Hashing,
/// The hash table has been built, and a batch of `probe_rows` has is
being output
/// but nothing has been output yet
Begin {
probe_rows: usize
},
/// Have output up to `last_matched_indices`
Output {
// saved probe-build indices to resume matching from
last_matched_indices: Option<(usize, usize)>,
}
/// ....
}
```
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1037,107 +1174,128 @@ impl HashJoinStream {
}
});
let mut hashes_buffer = vec![];
- self.right
- .poll_next_unpin(cx)
- .map(|maybe_batch| match maybe_batch {
- // one right batch in the join loop
- Some(Ok(batch)) => {
- self.join_metrics.input_batches.add(1);
- self.join_metrics.input_rows.add(batch.num_rows());
- let timer = self.join_metrics.join_time.timer();
-
- // get the matched two indices for the on condition
- let left_right_indices =
build_equal_condition_join_indices(
- &left_data.0,
- &left_data.1,
- &batch,
- &self.on_left,
- &self.on_right,
- &self.random_state,
- self.null_equals_null,
- &mut hashes_buffer,
- self.filter.as_ref(),
- JoinSide::Left,
- None,
- );
- let result = match left_right_indices {
- Ok((left_side, right_side)) => {
- // set the left bitmap
- // and only left, full, left semi, left anti need
the left bitmap
- if need_produce_result_in_final(self.join_type) {
- left_side.iter().flatten().for_each(|x| {
- visited_left_side.set_bit(x as usize,
true);
- });
- }
-
- // adjust the two side indices base on the join
type
- let (left_side, right_side) =
adjust_indices_by_join_type(
- left_side,
- right_side,
- batch.num_rows(),
- self.join_type,
- );
-
- let result = build_batch_from_indices(
- &self.schema,
- &left_data.1,
- &batch,
- &left_side,
- &right_side,
- &self.column_indices,
- JoinSide::Left,
- );
- self.join_metrics.output_batches.add(1);
-
self.join_metrics.output_rows.add(batch.num_rows());
- Some(result)
- }
- Err(err) => Some(exec_err!(
- "Fail to build join indices in HashJoinExec,
error:{err}"
- )),
- };
- timer.done();
- result
+ // Fetch next probe batch
+ if self.probe_batch.is_none() {
+ match self.right.poll_next_unpin(cx) {
Review Comment:
You can use `ready!` here to avoid having to match on `Poll::*` -- something
like
```rust
if self.probe_batch.is_none() {
match ready!(self.right.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
self.state.set_probe_rows(batch.num_rows());
self.probe_batch = Some(batch);
}
None => {
self.probe_batch = None;
}
Some(err) => return Poll::Ready(Some(err)),
}
}
```
##########
datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt:
##########
@@ -72,11 +72,11 @@ SELECT t1.a, t1.b, t1.c, t2.a as a2
ON t1.d = t2.d ORDER BY a2, t2.b
LIMIT 5
----
-0 0 0 0
-0 0 2 0
-0 0 3 0
-0 0 6 0
-0 0 20 0
+1 3 95 0
Review Comment:
I don't think the build side order is important -- maybe we should just
update the test so it has a well defined deterministic output 🤔
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1165,13 +1323,30 @@ mod tests {
use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder,
UInt64Builder};
use arrow::datatypes::{DataType, Field, Schema};
- use datafusion_common::{assert_batches_sorted_eq, assert_contains,
ScalarValue};
+ use datafusion_common::{
+ assert_batches_eq, assert_batches_sorted_eq, assert_contains,
ScalarValue,
+ };
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use hashbrown::raw::RawTable;
+ use rstest::*;
+ use rstest_reuse::{self, *};
+
+ fn div_ceil(a: usize, b: usize) -> usize {
+ (a + b - 1) / b
+ }
+
+ #[template]
+ #[rstest]
+ fn batch_sizes(#[values(8192, 10, 5, 2, 1)] batch_size: usize) {}
Review Comment:
This is a nice way to test the logic
##########
datafusion/physical-plan/src/joins/utils.rs:
##########
@@ -920,71 +920,99 @@ pub(crate) fn append_right_indices(
}
}
-/// Get unmatched and deduplicated indices
+/// Get unmatched and deduplicated indices for specified range of indices
pub(crate) fn get_anti_indices(
- row_count: usize,
+ range: Range<usize>,
input_indices: &UInt32Array,
) -> UInt32Array {
- let mut bitmap = BooleanBufferBuilder::new(row_count);
- bitmap.append_n(row_count, false);
- input_indices.iter().flatten().for_each(|v| {
- bitmap.set_bit(v as usize, true);
- });
+ let mut bitmap = BooleanBufferBuilder::new(range.len());
+ bitmap.append_n(range.len(), false);
+ input_indices
+ .iter()
+ .flatten()
+ .map(|v| v as usize)
+ .filter(|v| range.contains(v))
+ .for_each(|v| {
+ bitmap.set_bit(v - range.start, true);
+ });
+
+ let offset = range.start;
// get the anti index
- (0..row_count)
- .filter_map(|idx| (!bitmap.get_bit(idx)).then_some(idx as u32))
+ (range)
+ .filter_map(|idx| (!bitmap.get_bit(idx - offset)).then_some(idx as
u32))
.collect::<UInt32Array>()
}
/// Get unmatched and deduplicated indices
pub(crate) fn get_anti_u64_indices(
- row_count: usize,
+ range: Range<usize>,
input_indices: &UInt64Array,
) -> UInt64Array {
- let mut bitmap = BooleanBufferBuilder::new(row_count);
- bitmap.append_n(row_count, false);
- input_indices.iter().flatten().for_each(|v| {
- bitmap.set_bit(v as usize, true);
- });
+ let mut bitmap = BooleanBufferBuilder::new(range.len());
+ bitmap.append_n(range.len(), false);
+ input_indices
+ .iter()
+ .flatten()
+ .map(|v| v as usize)
+ .filter(|v| range.contains(v))
+ .for_each(|v| {
+ bitmap.set_bit(v - range.start, true);
+ });
+
+ let offset = range.start;
// get the anti index
- (0..row_count)
- .filter_map(|idx| (!bitmap.get_bit(idx)).then_some(idx as u64))
+ (range)
+ .filter_map(|idx| (!bitmap.get_bit(idx - offset)).then_some(idx as
u64))
.collect::<UInt64Array>()
}
-/// Get matched and deduplicated indices
+/// Get matched and deduplicated indices for specified range of indices
pub(crate) fn get_semi_indices(
- row_count: usize,
+ range: Range<usize>,
input_indices: &UInt32Array,
) -> UInt32Array {
- let mut bitmap = BooleanBufferBuilder::new(row_count);
- bitmap.append_n(row_count, false);
- input_indices.iter().flatten().for_each(|v| {
- bitmap.set_bit(v as usize, true);
- });
+ let mut bitmap = BooleanBufferBuilder::new(range.len());
+ bitmap.append_n(range.len(), false);
+ input_indices
+ .iter()
+ .flatten()
+ .map(|v| v as usize)
+ .filter(|v| range.contains(v))
+ .for_each(|v| {
+ bitmap.set_bit(v - range.start, true);
+ });
+
+ let offset = range.start;
// get the semi index
- (0..row_count)
- .filter_map(|idx| (bitmap.get_bit(idx)).then_some(idx as u32))
+ (range)
+ .filter_map(|idx| (bitmap.get_bit(idx - offset)).then_some(idx as u32))
.collect::<UInt32Array>()
}
/// Get matched and deduplicated indices
pub(crate) fn get_semi_u64_indices(
Review Comment:
Likewise, `get_semi_u64_indices` never seems to be called with a range that
doesn't start with zero
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]