This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1171574286 fix: Assertion fail in external sort (#15469)
1171574286 is described below
commit 11715742863be352629c89b7b3d5318686d0ac48
Author: Yongting You <[email protected]>
AuthorDate: Mon Mar 31 04:38:02 2025 +0800
fix: Assertion fail in external sort (#15469)
* fix assertion fail in external sort by refactoring
* clippy
* avoid assert
---
datafusion/core/tests/memory_limit/mod.rs | 31 ++++++++++
datafusion/physical-plan/src/sorts/sort.rs | 97 ++++++++++++++++++------------
2 files changed, 89 insertions(+), 39 deletions(-)
diff --git a/datafusion/core/tests/memory_limit/mod.rs
b/datafusion/core/tests/memory_limit/mod.rs
index dd5acc8d89..2702954e77 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -49,6 +49,7 @@ use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_optimizer::join_selection::JoinSelection;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::common::collect;
use datafusion_physical_plan::spill::get_record_batch_memory_size;
use rand::Rng;
use test_utils::AccessLogGenerator;
@@ -493,6 +494,36 @@ async fn test_in_mem_buffer_almost_full() {
let _ = df.collect().await.unwrap();
}
+/// External sort should be able to run if there is very little pre-reserved
memory
+/// for merge (set configuration sort_spill_reservation_bytes to 0).
+#[tokio::test]
+async fn test_external_sort_zero_merge_reservation() {
+ let config = SessionConfig::new()
+ .with_sort_spill_reservation_bytes(0)
+ .with_target_partitions(14);
+ let runtime = RuntimeEnvBuilder::new()
+ .with_memory_pool(Arc::new(FairSpillPool::new(10 * 1024 * 1024)))
+ .build_arc()
+ .unwrap();
+
+ let ctx = SessionContext::new_with_config_rt(config, runtime);
+
+ let query = "select * from generate_series(1,10000000) as t1(v1) order by
v1;";
+ let df = ctx.sql(query).await.unwrap();
+
+ let physical_plan = df.create_physical_plan().await.unwrap();
+ let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
+ let stream = physical_plan.execute(0, task_ctx).unwrap();
+
+ // Ensures execution succeed
+ let _result = collect(stream).await;
+
+ // Ensures the query spilled during execution
+ let metrics = physical_plan.metrics().unwrap();
+ let spill_count = metrics.spill_count().unwrap();
+ assert!(spill_count > 0);
+}
+
/// Run the query with the specified memory limit,
/// and verifies the expected errors are returned
#[derive(Clone, Debug)]
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index 11c3212f53..1072e9abf4 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -220,10 +220,8 @@ struct ExternalSorter {
// STATE BUFFERS:
// Fields that hold intermediate data during sorting
// ========================================================================
- /// Potentially unsorted in memory buffer
+ /// Unsorted input batches stored in the memory buffer
in_mem_batches: Vec<RecordBatch>,
- /// if `Self::in_mem_batches` are sorted
- in_mem_batches_sorted: bool,
/// During external sorting, in-memory intermediate data will be appended
to
/// this file incrementally. Once finished, this file will be moved to
[`Self::finished_spill_files`].
@@ -304,7 +302,6 @@ impl ExternalSorter {
Ok(Self {
schema,
in_mem_batches: vec![],
- in_mem_batches_sorted: false,
in_progress_spill_file: None,
finished_spill_files: vec![],
expr: expr.into(),
@@ -341,7 +338,6 @@ impl ExternalSorter {
}
self.in_mem_batches.push(input);
- self.in_mem_batches_sorted = false;
Ok(())
}
@@ -418,16 +414,13 @@ impl ExternalSorter {
self.metrics.spill_metrics.spill_file_count.value()
}
- /// When calling, all `in_mem_batches` must be sorted (*), and then all of
them will
- /// be appended to the in-progress spill file.
- ///
- /// (*) 'Sorted' here means globally sorted for all buffered batches when
the
- /// memory limit is reached, instead of partially sorted within the batch.
- async fn spill_append(&mut self) -> Result<()> {
- assert!(self.in_mem_batches_sorted);
-
- // we could always get a chance to free some memory as long as we are
holding some
- if self.in_mem_batches.is_empty() {
+ /// Appending globally sorted batches to the in-progress spill file, and
clears
+ /// the `globally_sorted_batches` (also its memory reservation) afterwards.
+ async fn consume_and_spill_append(
+ &mut self,
+ globally_sorted_batches: &mut Vec<RecordBatch>,
+ ) -> Result<()> {
+ if globally_sorted_batches.is_empty() {
return Ok(());
}
@@ -437,21 +430,25 @@ impl ExternalSorter {
Some(self.spill_manager.create_in_progress_file("Sorting")?);
}
- self.organize_stringview_arrays()?;
+ Self::organize_stringview_arrays(globally_sorted_batches)?;
debug!("Spilling sort data of ExternalSorter to disk whilst
inserting");
- let batches = std::mem::take(&mut self.in_mem_batches);
+ let batches_to_spill = std::mem::take(globally_sorted_batches);
self.reservation.free();
let in_progress_file =
self.in_progress_spill_file.as_mut().ok_or_else(|| {
internal_datafusion_err!("In-progress spill file should be
initialized")
})?;
- for batch in batches {
+ for batch in batches_to_spill {
in_progress_file.append_batch(&batch)?;
}
+ if !globally_sorted_batches.is_empty() {
+ return internal_err!("This function consumes
globally_sorted_batches, so it should be empty after taking.");
+ }
+
Ok(())
}
@@ -470,7 +467,7 @@ impl ExternalSorter {
Ok(())
}
- /// Reconstruct `self.in_mem_batches` to organize the payload buffers of
each
+ /// Reconstruct `globally_sorted_batches` to organize the payload buffers
of each
/// `StringViewArray` in sequential order by calling `gc()` on them.
///
/// Note this is a workaround until
<https://github.com/apache/arrow-rs/issues/7185> is
@@ -499,10 +496,12 @@ impl ExternalSorter {
///
/// Then when spilling each batch, the writer has to write all referenced
buffers
/// repeatedly.
- fn organize_stringview_arrays(&mut self) -> Result<()> {
- let mut organized_batches =
Vec::with_capacity(self.in_mem_batches.len());
+ fn organize_stringview_arrays(
+ globally_sorted_batches: &mut Vec<RecordBatch>,
+ ) -> Result<()> {
+ let mut organized_batches =
Vec::with_capacity(globally_sorted_batches.len());
- for batch in self.in_mem_batches.drain(..) {
+ for batch in globally_sorted_batches.drain(..) {
let mut new_columns: Vec<Arc<dyn Array>> =
Vec::with_capacity(batch.num_columns());
@@ -528,20 +527,17 @@ impl ExternalSorter {
organized_batches.push(organized_batch);
}
- self.in_mem_batches = organized_batches;
+ *globally_sorted_batches = organized_batches;
Ok(())
}
- /// Sorts the in_mem_batches in place
+ /// Sorts the in_mem_batches and potentially spill the sorted batches.
///
- /// Sorting may have freed memory, especially if fetch is `Some`. If
- /// the memory usage has dropped by a factor of 2, then we don't have
- /// to spill. Otherwise, we spill to free up memory for inserting
- /// more batches.
- /// The factor of 2 aims to avoid a degenerate case where the
- /// memory required for `fetch` is just under the memory available,
- /// causing repeated re-sorting of data
+ /// If the memory usage has dropped by a factor of 2, it might be a sort
with
+ /// fetch (e.g. sorting 1M rows but only keep the top 100), so we keep the
+ /// sorted entries inside `in_mem_batches` to be sorted in the next
iteration.
+ /// Otherwise, we spill the sorted run to free up memory for inserting
more batches.
///
/// # Arguments
///
@@ -560,10 +556,18 @@ impl ExternalSorter {
let mut sorted_stream =
self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
+ // After `in_mem_sort_stream()` is constructed, all `in_mem_batches`
is taken
+ // to construct a globally sorted stream.
+ if !self.in_mem_batches.is_empty() {
+ return internal_err!(
+ "in_mem_batches should be empty after constructing sorted
stream"
+ );
+ }
+ // 'global' here refers to all buffered batches when the memory limit
is
+ // reached. This variable will buffer the sorted batches after
+ // sort-preserving merge and incrementally append to spill files.
+ let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
- // `self.in_mem_batches` is already taken away by the sort_stream, now
it is empty.
- // We'll gradually collect the sorted stream into self.in_mem_batches,
or directly
- // write sorted batches to disk when the memory is insufficient.
let mut spilled = false;
while let Some(batch) = sorted_stream.next().await {
let batch = batch?;
@@ -572,12 +576,12 @@ impl ExternalSorter {
// Although the reservation is not enough, the batch is
// already in memory, so it's okay to combine it with
previously
// sorted batches, and spill together.
- self.in_mem_batches.push(batch);
- self.spill_append().await?; // reservation is freed in spill()
+ globally_sorted_batches.push(batch);
+ self.consume_and_spill_append(&mut globally_sorted_batches)
+ .await?; // reservation is freed in spill()
spilled = true;
} else {
- self.in_mem_batches.push(batch);
- self.in_mem_batches_sorted = true;
+ globally_sorted_batches.push(batch);
}
}
@@ -591,12 +595,27 @@ impl ExternalSorter {
if (self.reservation.size() > before / 2) || force_spill {
// We have not freed more than 50% of the memory, so we have to
spill to
// free up more memory
- self.spill_append().await?;
+ self.consume_and_spill_append(&mut globally_sorted_batches)
+ .await?;
spilled = true;
}
if spilled {
+ // There might be some buffered batches that haven't trigger a
spill yet.
+ self.consume_and_spill_append(&mut globally_sorted_batches)
+ .await?;
self.spill_finish().await?;
+ } else {
+ // If the memory limit has reached before calling this function,
and it
+ // didn't spill anything, it means this is a sorting with fetch
top K
+ // element: after sorting only the top K elements will be kept in
memory.
+ // For simplicity, those sorted top K entries are put back to
unsorted
+ // `in_mem_batches` to be consumed by the next sort/merge.
+ if !self.in_mem_batches.is_empty() {
+ return internal_err!("in_mem_batches should be cleared
before");
+ }
+
+ self.in_mem_batches = std::mem::take(&mut globally_sorted_batches);
}
// Reserve headroom for next sort/merge
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]