kosiew commented on code in PR #22103:
URL: https://github.com/apache/datafusion/pull/22103#discussion_r3239791041
##########
datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs:
##########
@@ -1577,20 +1599,31 @@ impl MaterializingSortMergeJoinStream {
let mut right_columns = Vec::with_capacity(num_right_cols);
// Read each source batch once (spilled batches require disk I/O).
- let source_data: Vec<Option<RecordBatch>> = source_batches
- .iter()
- .map(|&idx| {
- let bb = &self.buffered_data.batches[idx];
- match &bb.batch {
- BufferedBatchState::InMemory(batch) => Some(batch.clone()),
- BufferedBatchState::Spilled(spill_file) => {
- let file =
BufReader::new(File::open(spill_file.path()).ok()?);
- let reader = StreamReader::try_new(file, None).ok()?;
- reader.into_iter().next()?.ok()
- }
+ // Track memory for each spilled batch at the point of deserialization
+ // so the pool reflects actual usage as it grows.
+ let mut spill_read_mem: usize = 0;
+ let mut source_data: Vec<Option<RecordBatch>> =
+ Vec::with_capacity(source_batches.len());
+ for &idx in &source_batches {
+ let bb = &self.buffered_data.batches[idx];
+ match &bb.batch {
+ BufferedBatchState::InMemory(batch) => {
+ source_data.push(Some(batch.clone()));
}
- })
- .collect();
+ BufferedBatchState::Spilled(spill_file) => {
+ let batch_mem = bb.size_estimation;
+ self.reservation.grow(batch_mem);
+ self.join_metrics
+ .peak_mem_used()
+ .set_max(self.reservation.size());
+ spill_read_mem += batch_mem;
+
+ let file = BufReader::new(File::open(spill_file.path())?);
Review Comment:
It looks like `spill_read_mem` is only shrunk after all file reads and all
`interleave` calls succeed. Once `self.reservation.grow(batch_mem)` succeeds,
any later `?` such as from `File::open`, `StreamReader::try_new`,
`next().transpose()`, or `interleave` can return early before the shrink
happens at line 1651.
That leaves the reservation inflated until the stream is dropped, which
breaks the grow/shrink accounting invariant on error paths and can leave the
memory pool reporting stale reserved memory after a failed poll.
Could we make this temporary read-back reservation scoped/RAII-based, or
otherwise guarantee that `shrink` runs on every return path after a successful
`grow`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]