This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d288b80203 fix: External sort failing on an edge case (#15017)
d288b80203 is described below
commit d288b80203eba2906b830ea7bed9828e77eced1c
Author: Yongting You <[email protected]>
AuthorDate: Thu Mar 6 04:05:03 2025 +0800
fix: External sort failing on an edge case (#15017)
* fix external sort failure
* clippy
* review
---
datafusion/core/tests/memory_limit/mod.rs | 25 +++++++++++++++++++++++
datafusion/physical-plan/src/sorts/sort.rs | 32 +++++++++++++++++++-----------
2 files changed, 45 insertions(+), 12 deletions(-)
diff --git a/datafusion/core/tests/memory_limit/mod.rs
b/datafusion/core/tests/memory_limit/mod.rs
index 2deb8fde2d..8f690edc54 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -468,6 +468,31 @@ async fn test_stringview_external_sort() {
let _ = df.collect().await.expect("Query execution failed");
}
+/// This test case is for a previously detected bug:
+/// When `ExternalSorter` has read all input batches
+/// - It has spilled many sorted runs to disk
+/// - Its in-memory buffer for batches is almost full
+/// The previous implementation will try to merge the spills and in-memory
batches
+/// together, without spilling the in-memory batches first, causing OOM.
+#[tokio::test]
+async fn test_in_mem_buffer_almost_full() {
+ let config = SessionConfig::new()
+ .with_sort_spill_reservation_bytes(3000000)
+ .with_target_partitions(1);
+ let runtime = RuntimeEnvBuilder::new()
+ .with_memory_pool(Arc::new(FairSpillPool::new(10 * 1024 * 1024)))
+ .build_arc()
+ .unwrap();
+
+ let ctx = SessionContext::new_with_config_rt(config, runtime);
+
+ let query = "select * from generate_series(1,9000000) as t1(v1) order by
v1;";
+ let df = ctx.sql(query).await.unwrap();
+
+ // Check not fail
+ let _ = df.collect().await.unwrap();
+}
+
/// Run the query with the specified memory limit,
/// and verifies the expected errors are returned
#[derive(Clone, Debug)]
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index a6c5dbec74..55ba77096a 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -307,7 +307,7 @@ impl ExternalSorter {
let size = get_reserved_byte_for_record_batch(&input);
if self.reservation.try_grow(size).is_err() {
- self.sort_or_spill_in_mem_batches().await?;
+ self.sort_or_spill_in_mem_batches(false).await?;
// We've already freed more than half of reserved memory,
// so we can grow the reservation again. There's nothing we can do
// if this try_grow fails.
@@ -332,7 +332,7 @@ impl ExternalSorter {
///
/// 2. A combined streaming merge incorporating both in-memory
/// batches and data from spill files on disk.
- fn sort(&mut self) -> Result<SendableRecordBatchStream> {
+ async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
// Release the memory reserved for merge back to the pool so
// there is some left when `in_mem_sort_stream` requests an
// allocation.
@@ -340,10 +340,12 @@ impl ExternalSorter {
if self.spilled_before() {
let mut streams = vec![];
+
+ // Sort `in_mem_batches` and spill it first. If there are many
+ // `in_mem_batches` and the memory limit is almost reached, merging
+ // them with the spilled files at the same time might cause OOM.
if !self.in_mem_batches.is_empty() {
- let in_mem_stream =
-
self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
- streams.push(in_mem_stream);
+ self.sort_or_spill_in_mem_batches(true).await?;
}
for spill in self.spills.drain(..) {
@@ -488,11 +490,17 @@ impl ExternalSorter {
/// the memory usage has dropped by a factor of 2, then we don't have
/// to spill. Otherwise, we spill to free up memory for inserting
/// more batches.
- ///
/// The factor of 2 aims to avoid a degenerate case where the
/// memory required for `fetch` is just under the memory available,
- // causing repeated re-sorting of data
- async fn sort_or_spill_in_mem_batches(&mut self) -> Result<()> {
+ /// causing repeated re-sorting of data
+ ///
+ /// # Arguments
+ ///
+ /// * `force_spill` - If true, the method will spill the in-memory batches
+ /// even if the memory usage has not dropped by a factor of 2. Otherwise
it will
+ /// only spill when the memory usage has dropped by the pre-defined
factor.
+ ///
+ async fn sort_or_spill_in_mem_batches(&mut self, force_spill: bool) ->
Result<()> {
// Release the memory reserved for merge back to the pool so
// there is some left when `in_mem_sort_stream` requests an
// allocation. At the end of this function, memory will be
@@ -529,7 +537,7 @@ impl ExternalSorter {
// Sorting may free up some memory especially when fetch is `Some`. If
we have
// not freed more than 50% of the memory, then we have to spill to
free up more
// memory for inserting more batches.
- if self.reservation.size() > before / 2 {
+ if (self.reservation.size() > before / 2) || force_spill {
// We have not freed more than 50% of the memory, so we have to
spill to
// free up more memory
self.spill().await?;
@@ -1114,7 +1122,7 @@ impl ExecutionPlan for SortExec {
let batch = batch?;
sorter.insert_batch(batch).await?;
}
- sorter.sort()
+ sorter.sort().await
})
.try_flatten(),
)))
@@ -1409,7 +1417,7 @@ mod tests {
// bytes. We leave a little wiggle room for the actual numbers.
assert!((3..=10).contains(&spill_count));
assert!((9000..=10000).contains(&spilled_rows));
- assert!((36000..=40000).contains(&spilled_bytes));
+ assert!((38000..=42000).contains(&spilled_bytes));
let columns = result[0].columns();
@@ -1482,7 +1490,7 @@ mod tests {
// `number_of_batches / (sort_spill_reservation_bytes / batch_size)`
assert!((12..=18).contains(&spill_count));
assert!((15000..=20000).contains(&spilled_rows));
- assert!((700000..=900000).contains(&spilled_bytes));
+ assert!((900000..=1000000).contains(&spilled_bytes));
// Verify that the result is sorted
let concated_result = concat_batches(&schema, &result)?;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]