pantShrey commented on code in PR #22230:
URL: https://github.com/apache/datafusion/pull/22230#discussion_r3280993470


##########
datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs:
##########
@@ -1603,29 +1707,18 @@ impl MaterializingSortMergeJoinStream {
         let num_right_cols = self.buffered_schema.fields().len();
 
         // Read each source batch once (spilled batches require disk I/O).
-        // Track memory for each spilled batch at the point of deserialization
-        // so the pool reflects actual usage as it grows.
-        let spill_reservation = self.reservation.new_empty();
-        let mut source_data: Vec<Option<RecordBatch>> =
-            Vec::with_capacity(source_batches.len());
-        for &idx in &source_batches {
-            let bb = &self.buffered_data.batches[idx];
-            match &bb.batch {
-                BufferedBatchState::InMemory(batch) => {
-                    source_data.push(Some(batch.clone()));
-                }
-                BufferedBatchState::Spilled(spill_file) => {
-                    spill_reservation.grow(bb.size_estimation);
-                    self.join_metrics
-                        .peak_mem_used()
-                        .set_max(self.reservation.size() + 
spill_reservation.size());
-
-                    let file = BufReader::new(File::open(spill_file.path())?);
-                    let reader = StreamReader::try_new(file, None)?;
-                    source_data.push(reader.into_iter().next().transpose()?);
+        let source_data: Vec<RecordBatch> = source_batches
+            .iter()
+            .map(|&idx| {
+                let bb = &self.buffered_data.batches[idx];
+                match &bb.batch {
+                    BufferedBatchState::InMemory(batch) => batch.clone(),
+                    BufferedBatchState::Spilled(_) => {
+                        unreachable!("Batches were unspilled")

Review Comment:
   Fixed, replaced with `internal_err!` to fail the query gracefully instead of 
panicking the executor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to