This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1171574286 fix: Assertion fail in external sort (#15469)
1171574286 is described below

commit 11715742863be352629c89b7b3d5318686d0ac48
Author: Yongting You <[email protected]>
AuthorDate: Mon Mar 31 04:38:02 2025 +0800

    fix: Assertion fail in external sort (#15469)
    
    * fix assertion fail in external sort by refactoring
    
    * clippy
    
    * avoid assert
---
 datafusion/core/tests/memory_limit/mod.rs  | 31 ++++++++++
 datafusion/physical-plan/src/sorts/sort.rs | 97 ++++++++++++++++++------------
 2 files changed, 89 insertions(+), 39 deletions(-)

diff --git a/datafusion/core/tests/memory_limit/mod.rs 
b/datafusion/core/tests/memory_limit/mod.rs
index dd5acc8d89..2702954e77 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -49,6 +49,7 @@ use datafusion_expr::{Expr, TableType};
 use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
 use datafusion_physical_optimizer::join_selection::JoinSelection;
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::common::collect;
 use datafusion_physical_plan::spill::get_record_batch_memory_size;
 use rand::Rng;
 use test_utils::AccessLogGenerator;
@@ -493,6 +494,36 @@ async fn test_in_mem_buffer_almost_full() {
     let _ = df.collect().await.unwrap();
 }
 
+/// External sort should be able to run if there is very little pre-reserved 
memory
+/// for merge (set configuration sort_spill_reservation_bytes to 0).
+#[tokio::test]
+async fn test_external_sort_zero_merge_reservation() {
+    let config = SessionConfig::new()
+        .with_sort_spill_reservation_bytes(0)
+        .with_target_partitions(14);
+    let runtime = RuntimeEnvBuilder::new()
+        .with_memory_pool(Arc::new(FairSpillPool::new(10 * 1024 * 1024)))
+        .build_arc()
+        .unwrap();
+
+    let ctx = SessionContext::new_with_config_rt(config, runtime);
+
+    let query = "select * from generate_series(1,10000000) as t1(v1) order by 
v1;";
+    let df = ctx.sql(query).await.unwrap();
+
+    let physical_plan = df.create_physical_plan().await.unwrap();
+    let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
+    let stream = physical_plan.execute(0, task_ctx).unwrap();
+
+    // Ensures execution succeed
+    let _result = collect(stream).await;
+
+    // Ensures the query spilled during execution
+    let metrics = physical_plan.metrics().unwrap();
+    let spill_count = metrics.spill_count().unwrap();
+    assert!(spill_count > 0);
+}
+
 /// Run the query with the specified memory limit,
 /// and verifies the expected errors are returned
 #[derive(Clone, Debug)]
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index 11c3212f53..1072e9abf4 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -220,10 +220,8 @@ struct ExternalSorter {
     // STATE BUFFERS:
     // Fields that hold intermediate data during sorting
     // ========================================================================
-    /// Potentially unsorted in memory buffer
+    /// Unsorted input batches stored in the memory buffer
     in_mem_batches: Vec<RecordBatch>,
-    /// if `Self::in_mem_batches` are sorted
-    in_mem_batches_sorted: bool,
 
     /// During external sorting, in-memory intermediate data will be appended 
to
     /// this file incrementally. Once finished, this file will be moved to 
[`Self::finished_spill_files`].
@@ -304,7 +302,6 @@ impl ExternalSorter {
         Ok(Self {
             schema,
             in_mem_batches: vec![],
-            in_mem_batches_sorted: false,
             in_progress_spill_file: None,
             finished_spill_files: vec![],
             expr: expr.into(),
@@ -341,7 +338,6 @@ impl ExternalSorter {
         }
 
         self.in_mem_batches.push(input);
-        self.in_mem_batches_sorted = false;
         Ok(())
     }
 
@@ -418,16 +414,13 @@ impl ExternalSorter {
         self.metrics.spill_metrics.spill_file_count.value()
     }
 
-    /// When calling, all `in_mem_batches` must be sorted (*), and then all of 
them will
-    /// be appended to the in-progress spill file.
-    ///
-    /// (*) 'Sorted' here means globally sorted for all buffered batches when 
the
-    /// memory limit is reached, instead of partially sorted within the batch.
-    async fn spill_append(&mut self) -> Result<()> {
-        assert!(self.in_mem_batches_sorted);
-
-        // we could always get a chance to free some memory as long as we are 
holding some
-        if self.in_mem_batches.is_empty() {
+    /// Appending globally sorted batches to the in-progress spill file, and 
clears
+    /// the `globally_sorted_batches` (also its memory reservation) afterwards.
+    async fn consume_and_spill_append(
+        &mut self,
+        globally_sorted_batches: &mut Vec<RecordBatch>,
+    ) -> Result<()> {
+        if globally_sorted_batches.is_empty() {
             return Ok(());
         }
 
@@ -437,21 +430,25 @@ impl ExternalSorter {
                 Some(self.spill_manager.create_in_progress_file("Sorting")?);
         }
 
-        self.organize_stringview_arrays()?;
+        Self::organize_stringview_arrays(globally_sorted_batches)?;
 
         debug!("Spilling sort data of ExternalSorter to disk whilst 
inserting");
 
-        let batches = std::mem::take(&mut self.in_mem_batches);
+        let batches_to_spill = std::mem::take(globally_sorted_batches);
         self.reservation.free();
 
         let in_progress_file = 
self.in_progress_spill_file.as_mut().ok_or_else(|| {
             internal_datafusion_err!("In-progress spill file should be 
initialized")
         })?;
 
-        for batch in batches {
+        for batch in batches_to_spill {
             in_progress_file.append_batch(&batch)?;
         }
 
+        if !globally_sorted_batches.is_empty() {
+            return internal_err!("This function consumes 
globally_sorted_batches, so it should be empty after taking.");
+        }
+
         Ok(())
     }
 
@@ -470,7 +467,7 @@ impl ExternalSorter {
         Ok(())
     }
 
-    /// Reconstruct `self.in_mem_batches` to organize the payload buffers of 
each
+    /// Reconstruct `globally_sorted_batches` to organize the payload buffers 
of each
     /// `StringViewArray` in sequential order by calling `gc()` on them.
     ///
     /// Note this is a workaround until 
<https://github.com/apache/arrow-rs/issues/7185> is
@@ -499,10 +496,12 @@ impl ExternalSorter {
     ///
     /// Then when spilling each batch, the writer has to write all referenced 
buffers
     /// repeatedly.
-    fn organize_stringview_arrays(&mut self) -> Result<()> {
-        let mut organized_batches = 
Vec::with_capacity(self.in_mem_batches.len());
+    fn organize_stringview_arrays(
+        globally_sorted_batches: &mut Vec<RecordBatch>,
+    ) -> Result<()> {
+        let mut organized_batches = 
Vec::with_capacity(globally_sorted_batches.len());
 
-        for batch in self.in_mem_batches.drain(..) {
+        for batch in globally_sorted_batches.drain(..) {
             let mut new_columns: Vec<Arc<dyn Array>> =
                 Vec::with_capacity(batch.num_columns());
 
@@ -528,20 +527,17 @@ impl ExternalSorter {
             organized_batches.push(organized_batch);
         }
 
-        self.in_mem_batches = organized_batches;
+        *globally_sorted_batches = organized_batches;
 
         Ok(())
     }
 
-    /// Sorts the in_mem_batches in place
+    /// Sorts the in_mem_batches and potentially spill the sorted batches.
     ///
-    /// Sorting may have freed memory, especially if fetch is `Some`. If
-    /// the memory usage has dropped by a factor of 2, then we don't have
-    /// to spill. Otherwise, we spill to free up memory for inserting
-    /// more batches.
-    /// The factor of 2 aims to avoid a degenerate case where the
-    /// memory required for `fetch` is just under the memory available,
-    /// causing repeated re-sorting of data
+    /// If the memory usage has dropped by a factor of 2, it might be a sort 
with
+    /// fetch (e.g. sorting 1M rows but only keep the top 100), so we keep the
+    /// sorted entries inside `in_mem_batches` to be sorted in the next 
iteration.
+    /// Otherwise, we spill the sorted run to free up memory for inserting 
more batches.
     ///
     /// # Arguments
     ///
@@ -560,10 +556,18 @@ impl ExternalSorter {
 
         let mut sorted_stream =
             self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
+        // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` 
is taken
+        // to construct a globally sorted stream.
+        if !self.in_mem_batches.is_empty() {
+            return internal_err!(
+                "in_mem_batches should be empty after constructing sorted 
stream"
+            );
+        }
+        // 'global' here refers to all buffered batches when the memory limit 
is
+        // reached. This variable will buffer the sorted batches after
+        // sort-preserving merge and incrementally append to spill files.
+        let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
 
-        // `self.in_mem_batches` is already taken away by the sort_stream, now 
it is empty.
-        // We'll gradually collect the sorted stream into self.in_mem_batches, 
or directly
-        // write sorted batches to disk when the memory is insufficient.
         let mut spilled = false;
         while let Some(batch) = sorted_stream.next().await {
             let batch = batch?;
@@ -572,12 +576,12 @@ impl ExternalSorter {
                 // Although the reservation is not enough, the batch is
                 // already in memory, so it's okay to combine it with 
previously
                 // sorted batches, and spill together.
-                self.in_mem_batches.push(batch);
-                self.spill_append().await?; // reservation is freed in spill()
+                globally_sorted_batches.push(batch);
+                self.consume_and_spill_append(&mut globally_sorted_batches)
+                    .await?; // reservation is freed in spill()
                 spilled = true;
             } else {
-                self.in_mem_batches.push(batch);
-                self.in_mem_batches_sorted = true;
+                globally_sorted_batches.push(batch);
             }
         }
 
@@ -591,12 +595,27 @@ impl ExternalSorter {
         if (self.reservation.size() > before / 2) || force_spill {
             // We have not freed more than 50% of the memory, so we have to 
spill to
             // free up more memory
-            self.spill_append().await?;
+            self.consume_and_spill_append(&mut globally_sorted_batches)
+                .await?;
             spilled = true;
         }
 
         if spilled {
+            // There might be some buffered batches that haven't trigger a 
spill yet.
+            self.consume_and_spill_append(&mut globally_sorted_batches)
+                .await?;
             self.spill_finish().await?;
+        } else {
+            // If the memory limit has reached before calling this function, 
and it
+            // didn't spill anything, it means this is a sorting with fetch 
top K
+            // element: after sorting only the top K elements will be kept in 
memory.
+            // For simplicity, those sorted top K entries are put back to 
unsorted
+            // `in_mem_batches` to be consumed by the next sort/merge.
+            if !self.in_mem_batches.is_empty() {
+                return internal_err!("in_mem_batches should be cleared 
before");
+            }
+
+            self.in_mem_batches = std::mem::take(&mut globally_sorted_batches);
         }
 
         // Reserve headroom for next sort/merge


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to