mbutrovich commented on code in PR #22230:
URL: https://github.com/apache/datafusion/pull/22230#discussion_r3383772961
##########
datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs:
##########
@@ -917,6 +943,81 @@ impl MaterializingSortMergeJoinStream {
Poll::Pending
}
+ /// Identifies which buffered batches are needed for the upcoming freeze
operation
+ fn get_required_batch_indices(&self, buffered_freeze_count: usize) ->
Vec<usize> {
+ let mut needed = vec![];
+
+ // We need all batches that matched with streamed rows
+ for chunk in &self.streamed_batch.output_indices {
+ if let Some(idx) = chunk.buffered_batch_idx {
+ needed.push(idx);
+ }
+ }
+
+ // Full Joins need to emit null-joined rows, so we need batches up to
freeze_count
+ if self.join_type == JoinType::Full {
+ needed.extend(0..buffered_freeze_count);
+ }
+
+ needed.sort_unstable();
+ needed.dedup();
+ needed
+ }
+
+ /// Asynchronously reads spilled batches back into memory.
+ /// Only processes the required indices to avoid OOMs.
+ fn poll_spilled_batches(
+ &mut self,
+ cx: &mut Context<'_>,
+ required_indices: &[usize],
+ ) -> Poll<Result<()>> {
+ for &idx in required_indices {
+ // Guard against indices that might be out of bounds if the queue
was cleared
+ if idx >= self.buffered_data.batches.len() {
+ continue;
+ }
+
+ let bb = &mut self.buffered_data.batches[idx];
+
+ if let BufferedBatchState::Spilled(spill_file) = &bb.batch {
+ if self.spill_stream.is_none() {
+ let stream = self
+ .spill_manager
+ .read_spill_as_stream(spill_file.clone(), None)?;
+ self.spill_stream = Some(stream);
+ }
+
+ match
ready!(self.spill_stream.as_mut().unwrap().poll_next_unpin(cx)) {
Review Comment:
Could you open a tracking issue for this one? I don't want it to get lost
and it's the only way to really exercise the most important logic in this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]