Kontinuation commented on code in PR #14644:
URL: https://github.com/apache/datafusion/pull/14644#discussion_r1955893819


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -408,50 +395,114 @@ impl ExternalSorter {
 
         debug!("Spilling sort data of ExternalSorter to disk whilst 
inserting");
 
-        self.in_mem_sort().await?;
-
         let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
         let batches = std::mem::take(&mut self.in_mem_batches);
-        let spilled_rows = spill_record_batches(
+        let (spilled_rows, spilled_bytes) = spill_record_batches(
             batches,
             spill_file.path().into(),
             Arc::clone(&self.schema),
         )?;
         let used = self.reservation.free();
         self.metrics.spill_count.add(1);
-        self.metrics.spilled_bytes.add(used);
+        self.metrics.spilled_bytes.add(spilled_bytes);
         self.metrics.spilled_rows.add(spilled_rows);
         self.spills.push(spill_file);
         Ok(used)
     }
 
     /// Sorts the in_mem_batches in place
-    async fn in_mem_sort(&mut self) -> Result<()> {
-        if self.in_mem_batches_sorted {
-            return Ok(());
-        }
-
+    ///
+    /// Sorting may have freed memory, especially if fetch is `Some`. If
+    /// the memory usage has dropped by a factor of 2, then we don't have
+    /// to spill. Otherwise, we spill to free up memory for inserting
+    /// more batches.
+    ///
+    /// The factor of 2 aims to avoid a degenerate case where the
+    /// memory required for `fetch` is just under the memory available,
+    // causing repeated re-sorting of data
+    async fn sort_or_spill_in_mem_batches(&mut self) -> Result<()> {
         // Release the memory reserved for merge back to the pool so
-        // there is some left when `in_memo_sort_stream` requests an
+        // there is some left when `in_mem_sort_stream` requests an
         // allocation.
         self.merge_reservation.free();
 
-        self.in_mem_batches = self
-            .in_mem_sort_stream(self.metrics.baseline.intermediate())?
-            .try_collect()
-            .await?;
+        let before = self.reservation.size();
+
+        let mut sorted_stream =
+            self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
+
+        // `self.in_mem_batches` is already taken away by the sort_stream, now 
it is empty.
+        // We'll gradually collect the sorted stream into self.in_mem_batches, 
or directly
+        // write sorted batches to disk when the memory is insufficient.
+        let mut spill_writer: Option<IPCWriter> = None;
+        // Leave at least 1/3 of spill reservation for sort/merge the next 
batch. Here the
+        // 1/3 is simply an arbitrary chosen number.
+        let sort_merge_minimum_overhead = self.sort_spill_reservation_bytes / 
3;
+        while let Some(batch) = sorted_stream.next().await {
+            let batch = batch?;
+            match &mut spill_writer {
+                None => {
+                    let sorted_size = 
get_reserved_byte_for_record_batch(&batch);
+
+                    // We reserve more memory to ensure that we'll have enough 
memory for
+                    // `SortPreservingMergeStream` after consuming this batch, 
otherwise we'll
+                    // start spilling everything to disk.
+                    if self
+                        .reservation
+                        .try_grow(sorted_size + sort_merge_minimum_overhead)
+                        .is_err()
+                    {
+                        // Directly write in_mem_batches as well as all the 
remaining batches in
+                        // sorted_stream to disk. Further batches fetched from 
`sorted_stream` will
+                        // be handled by the `Some(writer)` matching arm.
+                        let spill_file =
+                            
self.runtime.disk_manager.create_tmp_file("Sorting")?;
+                        let mut writer = IPCWriter::new(spill_file.path(), 
&self.schema)?;
+                        // Flush everything in memory to the spill file
+                        for batch in self.in_mem_batches.drain(..) {
+                            writer.write(&batch)?;
+                        }
+                        // as well as the newly sorted batch
+                        writer.write(&batch)?;
+                        spill_writer = Some(writer);
+                        self.reservation.free();
+                        self.spills.push(spill_file);
+                    } else {
+                        self.in_mem_batches.push(batch);
+
+                        // Gives back memory for merging the next batch.
+                        self.reservation.shrink(sort_merge_minimum_overhead);
+                    }
+                }
+                Some(writer) => {
+                    writer.write(&batch)?;
+                }
+            }
+        }
 
-        let size: usize = self
-            .in_mem_batches
-            .iter()
-            .map(get_record_batch_memory_size)
-            .sum();
+        // Drop early to free up memory reserved by the sorted stream, 
otherwise the
+        // upcoming `self.reserve_memory_for_merge()` may fail due to 
insufficient memory.
+        drop(sorted_stream);
+
+        if let Some(writer) = &mut spill_writer {
+            writer.finish()?;
+            self.metrics.spill_count.add(1);
+            self.metrics.spilled_rows.add(writer.num_rows);
+            self.metrics.spilled_bytes.add(writer.num_bytes);
+        }
+
+        // Sorting may free up some memory especially when fetch is `Some`. If 
we have
+        // not freed more than 50% of the memory, then we have to spill to 
free up more
+        // memory for inserting more batches.
+        if spill_writer.is_none() && self.reservation.size() > before / 2 {

Review Comment:
   The comment says that when `fetch` is not `Some(N)`, the sorted batches will 
be truncated to only top `N` values. This will make the sorted batches smaller 
than the original. There's a test case `test_sort_fetch_memory_calculation` for 
this.



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -408,50 +395,114 @@ impl ExternalSorter {
 
         debug!("Spilling sort data of ExternalSorter to disk whilst 
inserting");
 
-        self.in_mem_sort().await?;
-
         let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
         let batches = std::mem::take(&mut self.in_mem_batches);
-        let spilled_rows = spill_record_batches(
+        let (spilled_rows, spilled_bytes) = spill_record_batches(
             batches,
             spill_file.path().into(),
             Arc::clone(&self.schema),
         )?;
         let used = self.reservation.free();
         self.metrics.spill_count.add(1);
-        self.metrics.spilled_bytes.add(used);
+        self.metrics.spilled_bytes.add(spilled_bytes);
         self.metrics.spilled_rows.add(spilled_rows);
         self.spills.push(spill_file);
         Ok(used)
     }
 
     /// Sorts the in_mem_batches in place
-    async fn in_mem_sort(&mut self) -> Result<()> {
-        if self.in_mem_batches_sorted {
-            return Ok(());
-        }
-
+    ///
+    /// Sorting may have freed memory, especially if fetch is `Some`. If
+    /// the memory usage has dropped by a factor of 2, then we don't have
+    /// to spill. Otherwise, we spill to free up memory for inserting
+    /// more batches.
+    ///
+    /// The factor of 2 aims to avoid a degenerate case where the
+    /// memory required for `fetch` is just under the memory available,
+    // causing repeated re-sorting of data
+    async fn sort_or_spill_in_mem_batches(&mut self) -> Result<()> {
         // Release the memory reserved for merge back to the pool so
-        // there is some left when `in_memo_sort_stream` requests an
+        // there is some left when `in_mem_sort_stream` requests an
         // allocation.
         self.merge_reservation.free();
 
-        self.in_mem_batches = self
-            .in_mem_sort_stream(self.metrics.baseline.intermediate())?
-            .try_collect()
-            .await?;
+        let before = self.reservation.size();
+
+        let mut sorted_stream =
+            self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
+
+        // `self.in_mem_batches` is already taken away by the sort_stream, now 
it is empty.
+        // We'll gradually collect the sorted stream into self.in_mem_batches, 
or directly
+        // write sorted batches to disk when the memory is insufficient.
+        let mut spill_writer: Option<IPCWriter> = None;
+        // Leave at least 1/3 of spill reservation for sort/merge the next 
batch. Here the
+        // 1/3 is simply an arbitrary chosen number.
+        let sort_merge_minimum_overhead = self.sort_spill_reservation_bytes / 
3;
+        while let Some(batch) = sorted_stream.next().await {
+            let batch = batch?;
+            match &mut spill_writer {
+                None => {
+                    let sorted_size = 
get_reserved_byte_for_record_batch(&batch);
+
+                    // We reserve more memory to ensure that we'll have enough 
memory for
+                    // `SortPreservingMergeStream` after consuming this batch, 
otherwise we'll
+                    // start spilling everything to disk.
+                    if self
+                        .reservation
+                        .try_grow(sorted_size + sort_merge_minimum_overhead)
+                        .is_err()
+                    {
+                        // Directly write in_mem_batches as well as all the 
remaining batches in
+                        // sorted_stream to disk. Further batches fetched from 
`sorted_stream` will
+                        // be handled by the `Some(writer)` matching arm.
+                        let spill_file =
+                            
self.runtime.disk_manager.create_tmp_file("Sorting")?;
+                        let mut writer = IPCWriter::new(spill_file.path(), 
&self.schema)?;
+                        // Flush everything in memory to the spill file
+                        for batch in self.in_mem_batches.drain(..) {
+                            writer.write(&batch)?;
+                        }
+                        // as well as the newly sorted batch
+                        writer.write(&batch)?;
+                        spill_writer = Some(writer);
+                        self.reservation.free();
+                        self.spills.push(spill_file);
+                    } else {
+                        self.in_mem_batches.push(batch);
+
+                        // Gives back memory for merging the next batch.
+                        self.reservation.shrink(sort_merge_minimum_overhead);
+                    }
+                }
+                Some(writer) => {
+                    writer.write(&batch)?;
+                }
+            }
+        }
 
-        let size: usize = self
-            .in_mem_batches
-            .iter()
-            .map(get_record_batch_memory_size)
-            .sum();
+        // Drop early to free up memory reserved by the sorted stream, 
otherwise the
+        // upcoming `self.reserve_memory_for_merge()` may fail due to 
insufficient memory.
+        drop(sorted_stream);
+
+        if let Some(writer) = &mut spill_writer {
+            writer.finish()?;
+            self.metrics.spill_count.add(1);
+            self.metrics.spilled_rows.add(writer.num_rows);
+            self.metrics.spilled_bytes.add(writer.num_bytes);
+        }
+
+        // Sorting may free up some memory especially when fetch is `Some`. If 
we have
+        // not freed more than 50% of the memory, then we have to spill to 
free up more
+        // memory for inserting more batches.
+        if spill_writer.is_none() && self.reservation.size() > before / 2 {

Review Comment:
   The comment says that when `fetch` is not `Some(N)`, the sorted batches will 
be truncated to only top `N` rows. This will make the sorted batches smaller 
than the original. There's a test case `test_sort_fetch_memory_calculation` for 
this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to