This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d288b80203 fix: External sort failing on an edge case (#15017)
d288b80203 is described below

commit d288b80203eba2906b830ea7bed9828e77eced1c
Author: Yongting You <[email protected]>
AuthorDate: Thu Mar 6 04:05:03 2025 +0800

    fix: External sort failing on an edge case (#15017)
    
    * fix external sort failure
    
    * clippy
    
    * review
---
 datafusion/core/tests/memory_limit/mod.rs  | 25 +++++++++++++++++++++++
 datafusion/physical-plan/src/sorts/sort.rs | 32 +++++++++++++++++++-----------
 2 files changed, 45 insertions(+), 12 deletions(-)

diff --git a/datafusion/core/tests/memory_limit/mod.rs 
b/datafusion/core/tests/memory_limit/mod.rs
index 2deb8fde2d..8f690edc54 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -468,6 +468,31 @@ async fn test_stringview_external_sort() {
     let _ = df.collect().await.expect("Query execution failed");
 }
 
+/// This test case is for a previously detected bug:
+/// When `ExternalSorter` has read all input batches
+/// - It has spilled many sorted runs to disk
+/// - Its in-memory buffer for batches is almost full
+/// The previous implementation will try to merge the spills and in-memory 
batches
+/// together, without spilling the in-memory batches first, causing OOM.
+#[tokio::test]
+async fn test_in_mem_buffer_almost_full() {
+    let config = SessionConfig::new()
+        .with_sort_spill_reservation_bytes(3000000)
+        .with_target_partitions(1);
+    let runtime = RuntimeEnvBuilder::new()
+        .with_memory_pool(Arc::new(FairSpillPool::new(10 * 1024 * 1024)))
+        .build_arc()
+        .unwrap();
+
+    let ctx = SessionContext::new_with_config_rt(config, runtime);
+
+    let query = "select * from generate_series(1,9000000) as t1(v1) order by 
v1;";
+    let df = ctx.sql(query).await.unwrap();
+
+    // Check not fail
+    let _ = df.collect().await.unwrap();
+}
+
 /// Run the query with the specified memory limit,
 /// and verifies the expected errors are returned
 #[derive(Clone, Debug)]
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index a6c5dbec74..55ba77096a 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -307,7 +307,7 @@ impl ExternalSorter {
 
         let size = get_reserved_byte_for_record_batch(&input);
         if self.reservation.try_grow(size).is_err() {
-            self.sort_or_spill_in_mem_batches().await?;
+            self.sort_or_spill_in_mem_batches(false).await?;
             // We've already freed more than half of reserved memory,
             // so we can grow the reservation again. There's nothing we can do
             // if this try_grow fails.
@@ -332,7 +332,7 @@ impl ExternalSorter {
     ///
     /// 2. A combined streaming merge incorporating both in-memory
     ///    batches and data from spill files on disk.
-    fn sort(&mut self) -> Result<SendableRecordBatchStream> {
+    async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
         // Release the memory reserved for merge back to the pool so
         // there is some left when `in_mem_sort_stream` requests an
         // allocation.
@@ -340,10 +340,12 @@ impl ExternalSorter {
 
         if self.spilled_before() {
             let mut streams = vec![];
+
+            // Sort `in_mem_batches` and spill it first. If there are many
+            // `in_mem_batches` and the memory limit is almost reached, merging
+            // them with the spilled files at the same time might cause OOM.
             if !self.in_mem_batches.is_empty() {
-                let in_mem_stream =
-                    
self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
-                streams.push(in_mem_stream);
+                self.sort_or_spill_in_mem_batches(true).await?;
             }
 
             for spill in self.spills.drain(..) {
@@ -488,11 +490,17 @@ impl ExternalSorter {
     /// the memory usage has dropped by a factor of 2, then we don't have
     /// to spill. Otherwise, we spill to free up memory for inserting
     /// more batches.
-    ///
     /// The factor of 2 aims to avoid a degenerate case where the
     /// memory required for `fetch` is just under the memory available,
-    // causing repeated re-sorting of data
-    async fn sort_or_spill_in_mem_batches(&mut self) -> Result<()> {
+    /// causing repeated re-sorting of data
+    ///
+    /// # Arguments
+    ///
+    /// * `force_spill` - If true, the method will spill the in-memory batches
+    ///   even if the memory usage has not dropped by a factor of 2. Otherwise 
it will
+    ///   only spill when the memory usage has dropped by the pre-defined 
factor.
+    ///
+    async fn sort_or_spill_in_mem_batches(&mut self, force_spill: bool) -> 
Result<()> {
         // Release the memory reserved for merge back to the pool so
         // there is some left when `in_mem_sort_stream` requests an
         // allocation. At the end of this function, memory will be
@@ -529,7 +537,7 @@ impl ExternalSorter {
         // Sorting may free up some memory especially when fetch is `Some`. If 
we have
         // not freed more than 50% of the memory, then we have to spill to 
free up more
         // memory for inserting more batches.
-        if self.reservation.size() > before / 2 {
+        if (self.reservation.size() > before / 2) || force_spill {
             // We have not freed more than 50% of the memory, so we have to 
spill to
             // free up more memory
             self.spill().await?;
@@ -1114,7 +1122,7 @@ impl ExecutionPlan for SortExec {
                             let batch = batch?;
                             sorter.insert_batch(batch).await?;
                         }
-                        sorter.sort()
+                        sorter.sort().await
                     })
                     .try_flatten(),
                 )))
@@ -1409,7 +1417,7 @@ mod tests {
         // bytes. We leave a little wiggle room for the actual numbers.
         assert!((3..=10).contains(&spill_count));
         assert!((9000..=10000).contains(&spilled_rows));
-        assert!((36000..=40000).contains(&spilled_bytes));
+        assert!((38000..=42000).contains(&spilled_bytes));
 
         let columns = result[0].columns();
 
@@ -1482,7 +1490,7 @@ mod tests {
         //  `number_of_batches / (sort_spill_reservation_bytes / batch_size)`
         assert!((12..=18).contains(&spill_count));
         assert!((15000..=20000).contains(&spilled_rows));
-        assert!((700000..=900000).contains(&spilled_bytes));
+        assert!((900000..=1000000).contains(&spilled_bytes));
 
         // Verify that the result is sorted
         let concated_result = concat_batches(&schema, &result)?;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to