kumarUjjawal commented on code in PR #22230:
URL: https://github.com/apache/datafusion/pull/22230#discussion_r3275199037
##########
datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs:
##########
@@ -785,24 +794,40 @@ impl BitwiseSortMergeJoinStream {
)
.count_ones();
- // Process spilled inner batches first (read back from disk).
- if let Some(spill_file) = &self.inner_key_spill {
- let file = BufReader::new(File::open(spill_file.path())?);
- let reader = StreamReader::try_new(file, None)?;
- for batch_result in reader {
- let inner_slice = batch_result?;
- matched_count = eval_filter_for_inner_slice(
- self.outer_is_left,
- filter,
- &outer_slice,
- &inner_slice,
- &mut self.matched,
- self.outer_offset,
- outer_group_len,
- matched_count,
- )?;
- if matched_count == outer_group_len {
- break;
+ // Process spilled inner batches first asynchronously.
+ if self.inner_key_spill.is_some() || self.spill_stream.is_some() {
+ if self.spill_stream.is_none()
+ && let Some(spill_file) = &self.inner_key_spill
+ {
+ let stream = self
+ .spill_manager
+ .read_spill_as_stream(spill_file.clone(), None)?;
+ self.spill_stream = Some(stream);
+ }
+
+ while matched_count < outer_group_len {
+ let stream = self.spill_stream.as_mut().unwrap();
+ match ready!(stream.poll_next_unpin(cx)) {
+ Some(Ok(inner_slice)) => {
+ matched_count = eval_filter_for_inner_slice(
+ self.outer_is_left,
+ filter,
+ &outer_slice,
+ &inner_slice,
+ &mut self.matched,
+ self.outer_offset,
+ outer_group_len,
+ matched_count,
+ )?;
+ }
+ Some(Err(e)) => {
+ self.spill_stream = None;
+ return Poll::Ready(Err(e));
+ }
+ None => {
Review Comment:
Can we make the empty-spill invariant consistent here? In the other async
spill-read path, None becomes internal_err!("Spill file was empty"), but here
we treat None as normal EOF and just break. Since these spill files come from
non-empty buffered data, I think this should either error here as well, or we
should document why empty spill streams are valid in this path.
##########
datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs:
##########
@@ -1603,29 +1707,18 @@ impl MaterializingSortMergeJoinStream {
let num_right_cols = self.buffered_schema.fields().len();
// Read each source batch once (spilled batches require disk I/O).
- // Track memory for each spilled batch at the point of deserialization
- // so the pool reflects actual usage as it grows.
- let spill_reservation = self.reservation.new_empty();
- let mut source_data: Vec<Option<RecordBatch>> =
- Vec::with_capacity(source_batches.len());
- for &idx in &source_batches {
- let bb = &self.buffered_data.batches[idx];
- match &bb.batch {
- BufferedBatchState::InMemory(batch) => {
- source_data.push(Some(batch.clone()));
- }
- BufferedBatchState::Spilled(spill_file) => {
- spill_reservation.grow(bb.size_estimation);
- self.join_metrics
- .peak_mem_used()
- .set_max(self.reservation.size() +
spill_reservation.size());
-
- let file = BufReader::new(File::open(spill_file.path())?);
- let reader = StreamReader::try_new(file, None)?;
- source_data.push(reader.into_iter().next().transpose()?);
+ let source_data: Vec<RecordBatch> = source_batches
+ .iter()
+ .map(|&idx| {
+ let bb = &self.buffered_data.batches[idx];
+ match &bb.batch {
+ BufferedBatchState::InMemory(batch) => batch.clone(),
+ BufferedBatchState::Spilled(_) => {
+ unreachable!("Batches were unspilled")
Review Comment:
I’d prefer `internal_err!` here instead of unreachable!. The same "should
have been unspilled already" precondition in
`fetch_right_columns_from_batch_by_idxs` already returns a regular internal
error. If a future caller misses the unspill step, failing the query is much
better than panicking the executor.
##########
datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs:
##########
@@ -917,6 +943,81 @@ impl MaterializingSortMergeJoinStream {
Poll::Pending
}
+ /// Identifies which buffered batches are needed for the upcoming freeze
operation
+ fn get_required_batch_indices(&self, buffered_freeze_count: usize) ->
Vec<usize> {
+ let mut needed = vec![];
+
+ // We need all batches that matched with streamed rows
+ for chunk in &self.streamed_batch.output_indices {
+ if let Some(idx) = chunk.buffered_batch_idx {
+ needed.push(idx);
+ }
+ }
+
+ // Full Joins need to emit null-joined rows, so we need batches up to
freeze_count
+ if self.join_type == JoinType::Full {
+ needed.extend(0..buffered_freeze_count);
+ }
+
+ needed.sort_unstable();
+ needed.dedup();
+ needed
+ }
+
+ /// Asynchronously reads spilled batches back into memory.
+ /// Only processes the required indices to avoid OOMs.
+ fn poll_spilled_batches(
+ &mut self,
+ cx: &mut Context<'_>,
+ required_indices: &[usize],
+ ) -> Poll<Result<()>> {
+ for &idx in required_indices {
+ // Guard against indices that might be out of bounds if the queue
was cleared
+ if idx >= self.buffered_data.batches.len() {
+ continue;
+ }
+
+ let bb = &mut self.buffered_data.batches[idx];
+
+ if let BufferedBatchState::Spilled(spill_file) = &bb.batch {
+ if self.spill_stream.is_none() {
+ let stream = self
+ .spill_manager
+ .read_spill_as_stream(spill_file.clone(), None)?;
+ self.spill_stream = Some(stream);
+ }
+
+ match
ready!(self.spill_stream.as_mut().unwrap().poll_next_unpin(cx)) {
Review Comment:
I think this still needs dedicated coverage for Poll::Pending from the
spill-read stream itself. The existing PendingStream tests only inject pending
on the outer/inner input streams, so they do not exercise resume from
`read_spill_as_stream()` here or the matching async path in `bitwise_stream`. I
would add a test that forces the spill stream to yield Pending mid-read.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]