2010YOUY01 commented on code in PR #16443:
URL: https://github.com/apache/datafusion/pull/16443#discussion_r2199423074
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -828,13 +833,127 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
handle_state!(self.process_probe_batch())
}
NestedLoopJoinStreamState::ExhaustedProbeSide => {
- handle_state!(self.process_unmatched_build_batch())
+ handle_state!(self.prepare_unmatched_output_indices())
+ }
+ NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => {
+ handle_state!(self.build_unmatched_output())
}
NestedLoopJoinStreamState::Completed => Poll::Ready(None),
};
}
}
+ fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> {
+ let (left_indices, right_indices, start) =
+ self.join_result_status.as_mut().ok_or_else(|| {
+ datafusion_common::_internal_datafusion_err!(
+ "should have join_result_status"
+ )
+ })?;
+
+ let left_batch = self
+ .left_data
+ .as_ref()
+ .ok_or_else(|| {
+ datafusion_common::_internal_datafusion_err!("should have
left_batch")
+ })?
+ .batch();
+
+ let right_batch = match &self.state {
+ NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) =>
record_batch,
+ NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch)
=> {
+ record_batch
+ }
+ _ => {
+ return internal_err!(
+ "state should be ProcessProbeBatch or OutputUnmatchBatch"
+ )
+ }
+ };
+
+ let current_start = *start;
+
+ if left_indices.is_empty() && right_indices.is_empty() &&
current_start == 0 {
Review Comment:
I don't get this `status.processed_count = 1` logic either, perhaps you can
add a quick comment to explain it?
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -828,15 +845,125 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
let poll = handle_state!(self.process_probe_batch());
self.join_metrics.baseline.record_poll(poll)
}
- NestedLoopJoinStreamState::ExhaustedProbeSide => {
- let poll =
handle_state!(self.process_unmatched_build_batch());
+ NestedLoopJoinStreamState::PrepareUnmatchedBuildRows => {
+ handle_state!(self.prepare_unmatched_output_indices())
+ }
+ NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => {
+ let poll = handle_state!(self.build_unmatched_output());
self.join_metrics.baseline.record_poll(poll)
}
NestedLoopJoinStreamState::Completed => Poll::Ready(None),
};
}
}
+ // This function's main job is to construct an output `RecordBatch` based
on pre-calculated join indices.
+ // It operates in a chunk-based manner, meaning it processes a portion of
the results in each call,
+ // making it suitable for streaming large datasets without high memory
consumption.
+ fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> {
+ let status = self.join_result_status.as_mut().ok_or_else(|| {
+ internal_datafusion_err!(
+ "get_next_join_result called without initializing
join_result_status"
+ )
+ })?;
+
+ let (left_indices, right_indices, current_start) = (
+ &status.build_indices,
+ &status.probe_indices,
+ status.processed_count,
+ );
+
+ let left_batch = self
+ .left_data
+ .as_ref()
+ .ok_or_else(|| internal_datafusion_err!("should have left_batch"))?
+ .batch();
+
+ let right_batch = match &self.state {
+ NestedLoopJoinStreamState::ProcessProbeBatch(record_batch)
+ |
NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) => {
+ record_batch
+ }
+ _ => {
+ return internal_err!(
+ "State should be ProcessProbeBatch or
OutputUnmatchedBuildRows"
+ )
+ }
+ };
+
+ if left_indices.is_empty() && right_indices.is_empty() &&
current_start == 0 {
+ let res = RecordBatch::new_empty(Arc::clone(&self.schema));
+ status.processed_count = 1;
+ return Ok(Some(res));
+ }
+
+ if matches!(self.join_type, JoinType::RightSemi | JoinType::RightAnti)
{
Review Comment:
I think from here to the end of the function, it can look nicer if we
structure it like this way
```rust
match self.join_type {
JoinType::RightSemi | JoinType::RightAnti => {...}
JoinType::RightMark => {...}
JoinType::......(others) => {}
_ => {unreachable!()}
}
```
##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -705,8 +696,29 @@ impl NestedLoopJoinStreamState {
}
}
+/// Tracks incremental output of join result batches.
+///
+/// Initialized with all matching pairs that satisfy the join predicate.
+/// Pairs are stored as indices in `build_indices` and `probe_indices`
+/// Each poll outputs a batch within the configured size limit and updates
+/// processed_count until all pairs are consumed.
+///
+/// Example: 5000 matches, batch size limit is 100
+/// - Poll 1: output batch[0..100], processed_count = 100
+/// - Poll 2: output batch[100..200], processed_count = 200
+/// - ...continues until processed_count = 5000
+struct JoinResultStatus {
Review Comment:
nit: `Status` is most commonly used for error code/ state flags, perhaps we
can `JoinResultProgress` here to avoid confusion?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]