pantShrey commented on code in PR #22230:
URL: https://github.com/apache/datafusion/pull/22230#discussion_r3281058894
##########
datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs:
##########
@@ -917,6 +943,81 @@ impl MaterializingSortMergeJoinStream {
Poll::Pending
}
+ /// Identifies which buffered batches are needed for the upcoming freeze
operation
+ fn get_required_batch_indices(&self, buffered_freeze_count: usize) ->
Vec<usize> {
+ let mut needed = vec![];
+
+ // We need all batches that matched with streamed rows
+ for chunk in &self.streamed_batch.output_indices {
+ if let Some(idx) = chunk.buffered_batch_idx {
+ needed.push(idx);
+ }
+ }
+
+ // Full Joins need to emit null-joined rows, so we need batches up to
freeze_count
+ if self.join_type == JoinType::Full {
+ needed.extend(0..buffered_freeze_count);
+ }
+
+ needed.sort_unstable();
+ needed.dedup();
+ needed
+ }
+
+ /// Asynchronously reads spilled batches back into memory.
+ /// Only processes the required indices to avoid OOMs.
+ fn poll_spilled_batches(
+ &mut self,
+ cx: &mut Context<'_>,
+ required_indices: &[usize],
+ ) -> Poll<Result<()>> {
+ for &idx in required_indices {
+ // Guard against indices that might be out of bounds if the queue
was cleared
+ if idx >= self.buffered_data.batches.len() {
+ continue;
+ }
+
+ let bb = &mut self.buffered_data.batches[idx];
+
+ if let BufferedBatchState::Spilled(spill_file) = &bb.batch {
+ if self.spill_stream.is_none() {
+ let stream = self
+ .spill_manager
+ .read_spill_as_stream(spill_file.clone(), None)?;
+ self.spill_stream = Some(stream);
+ }
+
+ match
ready!(self.spill_stream.as_mut().unwrap().poll_next_unpin(cx)) {
Review Comment:
I completely agree this needs coverage, I tried to build a mock for this
locally, but the challenge is that currently `read_spill_as_stream`
instantiates a real file stream internally, leaving no clean injection point.
This would become much cleaner once #21882 lands, at that point a
`MockSpillFile` can inject `Poll::Pending `cleanly without touching internals.
Would it be alright to handle this test as a follow-up once that PR lands?
Happy to open a tracking issue now if that's preferred.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]