This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 2a6f3aafca perf: Improve NLJ for very small right side case (#17562)
2a6f3aafca is described below
commit 2a6f3aafcaf3358822d134a721a767bb3676f3b5
Author: Yongting You <[email protected]>
AuthorDate: Thu Nov 13 14:48:08 2025 +0800
perf: Improve NLJ for very small right side case (#17562)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Closes https://github.com/apache/datafusion/issues/17547
## Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
See the issue for the background. If the optimizer made the wrong join
order decision, and put a very small input at the probe side of NLJ, the
NLJ operator now can handle it much faster than before.
For implementation, before it's always handling `(one_left_row X
right_batch)` in the inner loop, this PR do join multiple left rows at
once with the right batch, if the right batch is very small.
The NLJ microbench result, only Q13 is for this workload:
```
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┓
┃ Query ┃ before ┃ improve-nlj-small-right ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 85.31 ms │ 85.75 ms │ no change │
│ QQuery 2 │ 111.36 ms │ 109.88 ms │ no change │
│ QQuery 3 │ 180.99 ms │ 181.56 ms │ no change │
│ QQuery 4 │ 340.38 ms │ 355.24 ms │ no change │
│ QQuery 5 │ 248.62 ms │ 231.90 ms │ +1.07x faster │
│ QQuery 6 │ 1680.89 ms │ 1682.07 ms │ no change │
│ QQuery 7 │ 233.65 ms │ 234.83 ms │ no change │
│ QQuery 8 │ 1679.12 ms │ 1675.63 ms │ no change │
│ QQuery 9 │ 266.52 ms │ 266.54 ms │ no change │
│ QQuery 10 │ 544.66 ms │ 544.71 ms │ no change │
│ QQuery 11 │ 274.43 ms │ 265.71 ms │ no change │
│ QQuery 12 │ 275.11 ms │ 274.72 ms │ no change │
│ QQuery 13 │ 76.56 ms │ 1.88 ms │ +40.70x faster │
└──────────────┴────────────┴─────────────────────────┴────────────────┘
```
In DF49 it's around 4ms.
## What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
- Added one microbench query targeting small right input workload
- Added one branch in the NLj's right input handling logic: if the
current right batch is very small, try to join it with multiple left
rows.
## Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
This can be covered by existing tests: this additional path is not only
triggered if the entire right input is small. For regular workloads, the
final input batch can be also very small, so this new path can be
triggered and tested.
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---
.../physical-plan/src/joins/nested_loop_join.rs | 262 +++++++++++++++++++--
1 file changed, 247 insertions(+), 15 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 9377ace33a..a6b3df8819 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -50,7 +50,7 @@ use crate::{
use arrow::array::{
new_null_array, Array, BooleanArray, BooleanBufferBuilder,
RecordBatchOptions,
- UInt64Array,
+ UInt32Array, UInt64Array,
};
use arrow::buffer::BooleanBuffer;
use arrow::compute::{
@@ -1269,11 +1269,51 @@ impl NestedLoopJoinStream {
// and push the result into output_buffer
// ========
+ // Special case:
+ // When the right batch is very small, join with multiple left rows at
once,
+ //
+ // The regular implementation is not efficient if the plan's right
child is
+ // very small (e.g. 1 row total), because inside the inner loop of
NLJ, it's
+ // handling one input right batch at once, if it's not large enough,
the
+ // overheads like filter evaluation can't be amortized through
vectorization.
+ debug_assert_ne!(
+ right_batch.num_rows(),
+ 0,
+ "When fetching the right batch, empty batches will be skipped"
+ );
+
+ let l_row_cnt_ratio = self.batch_size / right_batch.num_rows();
+ if l_row_cnt_ratio > 10 {
+ // Calculate max left rows to handle at once. This operator tries
to handle
+ // up to `datafusion.execution.batch_size` rows at once in the
intermediate
+ // batch.
+ let l_row_count = std::cmp::min(
+ l_row_cnt_ratio,
+ left_data.batch().num_rows() - self.left_probe_idx,
+ );
+
+ debug_assert!(l_row_count != 0, "This function should only be
entered when there are remaining left rows to process");
+ let joined_batch = self.process_left_range_join(
+ &left_data,
+ &right_batch,
+ self.left_probe_idx,
+ l_row_count,
+ )?;
+
+ if let Some(batch) = joined_batch {
+ self.output_buffer.push_batch(batch)?;
+ }
+
+ self.left_probe_idx += l_row_count;
+
+ return Ok(true);
+ }
+
let l_idx = self.left_probe_idx;
- let join_batch =
+ let joined_batch =
self.process_single_left_row_join(&left_data, &right_batch,
l_idx)?;
- if let Some(batch) = join_batch {
+ if let Some(batch) = joined_batch {
self.output_buffer.push_batch(batch)?;
}
@@ -1286,8 +1326,196 @@ impl NestedLoopJoinStream {
Ok(true)
}
+ /// Process [l_start_index, l_start_index + l_count) JOIN right_batch
+ /// Returns a RecordBatch containing the join results (None if empty)
+ ///
+ /// Side Effect: If the join type requires, left or right side matched
bitmap
+ /// will be set for matched indices.
+ fn process_left_range_join(
+ &mut self,
+ left_data: &JoinLeftData,
+ right_batch: &RecordBatch,
+ l_start_index: usize,
+ l_row_count: usize,
+ ) -> Result<Option<RecordBatch>> {
+ // Construct the Cartesian product between the specified range of left
rows
+ // and the entire right_batch. First, it calculates the index vectors,
then
+ // materializes the intermediate batch, and finally applies the join
filter
+ // to it.
+ // -----------------------------------------------------------
+ let right_rows = right_batch.num_rows();
+ let total_rows = l_row_count * right_rows;
+
+ // Build index arrays for cartesian product: left_range X right_batch
+ let left_indices: UInt32Array =
+ UInt32Array::from_iter_values((0..l_row_count).flat_map(|i| {
+ std::iter::repeat_n((l_start_index + i) as u32, right_rows)
+ }));
+ let right_indices: UInt32Array = UInt32Array::from_iter_values(
+ (0..l_row_count).flat_map(|_| 0..right_rows as u32),
+ );
+
+ debug_assert!(
+ left_indices.len() == right_indices.len()
+ && right_indices.len() == total_rows,
+ "The length or cartesian product should be (left_size *
right_size)",
+ );
+
+ // Evaluate the join filter (if any) over an intermediate batch built
+ // using the filter's own schema/column indices.
+ let bitmap_combined = if let Some(filter) = &self.join_filter {
+ // Build the intermediate batch for filter evaluation
+ let intermediate_batch = if filter.schema.fields().is_empty() {
+ // Constant predicate (e.g., TRUE/FALSE). Use an empty schema
with row_count
+ create_record_batch_with_empty_schema(
+ Arc::new((*filter.schema).clone()),
+ total_rows,
+ )?
+ } else {
+ let mut filter_columns: Vec<Arc<dyn Array>> =
+ Vec::with_capacity(filter.column_indices().len());
+ for column_index in filter.column_indices() {
+ let array = if column_index.side == JoinSide::Left {
+ let col = left_data.batch().column(column_index.index);
+ take(col.as_ref(), &left_indices, None)?
+ } else {
+ let col = right_batch.column(column_index.index);
+ take(col.as_ref(), &right_indices, None)?
+ };
+ filter_columns.push(array);
+ }
+
+ RecordBatch::try_new(Arc::new((*filter.schema).clone()),
filter_columns)?
+ };
+
+ let filter_result = filter
+ .expression()
+ .evaluate(&intermediate_batch)?
+ .into_array(intermediate_batch.num_rows())?;
+ let filter_arr = as_boolean_array(&filter_result)?;
+
+ // Combine with null bitmap to get a unified mask
+ boolean_mask_from_filter(filter_arr)
+ } else {
+ // No filter: all pairs match
+ BooleanArray::from(vec![true; total_rows])
+ };
+
+ // Update the global left or right bitmap for matched indices
+ // -----------------------------------------------------------
+
+ // None means we don't have to update left bitmap for this join type
+ let mut left_bitmap = if need_produce_result_in_final(self.join_type) {
+ Some(left_data.bitmap().lock())
+ } else {
+ None
+ };
+
+ // 'local' meaning: we want to collect 'is_matched' flag for the
current
+ // right batch, after it has joining all of the left buffer, here it's
only
+ // the partial result for joining given left range
+ let mut local_right_bitmap = if self.should_track_unmatched_right {
+ let mut current_right_batch_bitmap =
BooleanBufferBuilder::new(right_rows);
+ // Ensure builder has logical length so set_bit is in-bounds
+ current_right_batch_bitmap.append_n(right_rows, false);
+ Some(current_right_batch_bitmap)
+ } else {
+ None
+ };
+
+ // Set the matched bit for left and right side bitmap
+ for (i, is_matched) in bitmap_combined.iter().enumerate() {
+ let is_matched = is_matched.ok_or_else(|| {
+ internal_datafusion_err!("Must be Some after the previous
combining step")
+ })?;
+
+ let l_index = l_start_index + i / right_rows;
+ let r_index = i % right_rows;
+
+ if let Some(bitmap) = left_bitmap.as_mut() {
+ if is_matched {
+ // Map local index back to absolute left index within the
batch
+ bitmap.set_bit(l_index, true);
+ }
+ }
+
+ if let Some(bitmap) = local_right_bitmap.as_mut() {
+ if is_matched {
+ bitmap.set_bit(r_index, true);
+ }
+ }
+ }
+
+ // Apply the local right bitmap to the global bitmap
+ if self.should_track_unmatched_right {
+ // Remember to put it back after update
+ let global_right_bitmap =
+ std::mem::take(&mut
self.current_right_batch_matched).ok_or_else(
+ || internal_datafusion_err!("right batch's bitmap should
be present"),
+ )?;
+ let (buf, nulls) = global_right_bitmap.into_parts();
+ debug_assert!(nulls.is_none());
+
+ let current_right_bitmap = local_right_bitmap
+ .ok_or_else(|| {
+ internal_datafusion_err!(
+ "Should be Some if the current join type requires
right bitmap"
+ )
+ })?
+ .finish();
+ let updated_global_right_bitmap = buf.bitor(¤t_right_bitmap);
+
+ self.current_right_batch_matched =
+ Some(BooleanArray::new(updated_global_right_bitmap, None));
+ }
+
+ // For the following join types: only bitmaps are updated; do not emit
rows now
+ if matches!(
+ self.join_type,
+ JoinType::LeftAnti
+ | JoinType::LeftSemi
+ | JoinType::LeftMark
+ | JoinType::RightAnti
+ | JoinType::RightMark
+ | JoinType::RightSemi
+ ) {
+ return Ok(None);
+ }
+
+ // Build the projected output batch (using output
schema/column_indices),
+ // then apply the bitmap filter to it.
+ if self.output_schema.fields().is_empty() {
+ // Empty projection: only row count matters
+ let row_count = bitmap_combined.true_count();
+ return Ok(Some(create_record_batch_with_empty_schema(
+ Arc::clone(&self.output_schema),
+ row_count,
+ )?));
+ }
+
+ let mut out_columns: Vec<Arc<dyn Array>> =
+ Vec::with_capacity(self.output_schema.fields().len());
+ for column_index in &self.column_indices {
+ let array = if column_index.side == JoinSide::Left {
+ let col = left_data.batch().column(column_index.index);
+ take(col.as_ref(), &left_indices, None)?
+ } else {
+ let col = right_batch.column(column_index.index);
+ take(col.as_ref(), &right_indices, None)?
+ };
+ out_columns.push(array);
+ }
+ let pre_filtered =
+ RecordBatch::try_new(Arc::clone(&self.output_schema),
out_columns)?;
+ let filtered = filter_record_batch(&pre_filtered, &bitmap_combined)?;
+ Ok(Some(filtered))
+ }
+
/// Process a single left row join with the current right batch.
/// Returns a RecordBatch containing the join results (None if empty)
+ ///
+ /// Side Effect: If the join type requires, left or right side matched
bitmap
+ /// will be set for matched indices.
fn process_single_left_row_join(
&mut self,
left_data: &JoinLeftData,
@@ -1584,22 +1812,26 @@ fn apply_filter_to_row_join_batch(
.into_array(intermediate_batch.num_rows())?;
let filter_arr = as_boolean_array(&filter_result)?;
- // [Caution] This step has previously introduced bugs
- // The filter result is NOT a bitmap; it contains true/false/null values.
- // For example, 1 < NULL is evaluated to NULL. Therefore, we must combine
(AND)
- // the boolean array with its null bitmap to construct a unified bitmap.
- let (is_filtered, nulls) = filter_arr.clone().into_parts();
- let bitmap_combined = match nulls {
- Some(nulls) => {
- let combined = nulls.inner() & &is_filtered;
- BooleanArray::new(combined, None)
- }
- None => BooleanArray::new(is_filtered, None),
- };
+ // Convert boolean array with potential nulls into a unified mask bitmap
+ let bitmap_combined = boolean_mask_from_filter(filter_arr);
Ok(bitmap_combined)
}
+/// Convert a boolean filter array into a unified mask bitmap.
+///
+/// Caution: The filter result is NOT a bitmap; it contains true/false/null
values.
+/// For example, `1 < NULL` evaluates to NULL. Therefore, we must combine (AND)
+/// the boolean array with its null bitmap to construct a unified bitmap.
+#[inline]
+fn boolean_mask_from_filter(filter_arr: &BooleanArray) -> BooleanArray {
+ let (values, nulls) = filter_arr.clone().into_parts();
+ match nulls {
+ Some(nulls) => BooleanArray::new(nulls.inner() & &values, None),
+ None => BooleanArray::new(values, None),
+ }
+}
+
/// This function performs the following steps:
/// 1. Apply filter to probe-side batch
/// 2. Broadcast the left row (build_side_batch\[build_side_index\]) to the
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]