This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 0d65394f2 Improve error messages when memory is exhausted while
sorting (#4348)
0d65394f2 is described below
commit 0d65394f2d4b36938e549cf1686cef1c51813eb5
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Nov 25 13:37:49 2022 -0500
Improve error messages when memory is exhausted while sorting (#4348)
---
datafusion/core/src/execution/disk_manager.rs | 28 ++++++++++++++-----------
datafusion/core/src/physical_plan/sorts/sort.rs | 2 +-
2 files changed, 17 insertions(+), 13 deletions(-)
diff --git a/datafusion/core/src/execution/disk_manager.rs
b/datafusion/core/src/execution/disk_manager.rs
index 80d594341..b8e5fac3b 100644
--- a/datafusion/core/src/execution/disk_manager.rs
+++ b/datafusion/core/src/execution/disk_manager.rs
@@ -74,7 +74,7 @@ pub struct DiskManager {
/// TempDirs to put temporary files in.
///
/// If `Some(vec![])` a new OS specified temporary directory will be
created
- /// If `None` an error will be returned
+ /// If `None` an error will be returned (configured not to spill)
local_dirs: Mutex<Option<Vec<TempDir>>>,
}
@@ -103,12 +103,16 @@ impl DiskManager {
}
/// Return a temporary file from a randomized choice in the configured
locations
- pub fn create_tmp_file(&self) -> Result<NamedTempFile> {
+ ///
+ /// If the file can not be created for some reason, returns an
+ /// error message referencing the request description
+ pub fn create_tmp_file(&self, request_description: &str) ->
Result<NamedTempFile> {
let mut guard = self.local_dirs.lock();
let local_dirs = guard.as_mut().ok_or_else(|| {
- DataFusionError::ResourcesExhausted(
- "Cannot spill to temporary file as DiskManager is
disabled".to_string(),
- )
+ DataFusionError::ResourcesExhausted(format!(
+ "Memory Exhausted while {} (DiskManager is disabled)",
+ request_description
+ ))
})?;
// Create a temporary directory if needed
@@ -116,8 +120,9 @@ impl DiskManager {
let tempdir =
tempfile::tempdir().map_err(DataFusionError::IoError)?;
debug!(
- "Created directory '{:?}' as DataFusion tempfile directory",
- tempdir.path().to_string_lossy()
+ "Created directory '{:?}' as DataFusion tempfile directory for
{}",
+ tempdir.path().to_string_lossy(),
+ request_description,
);
local_dirs.push(tempdir);
@@ -160,7 +165,7 @@ mod tests {
assert_eq!(0, local_dir_snapshot(&dm).len());
// can still create a tempfile however:
- let actual = dm.create_tmp_file()?;
+ let actual = dm.create_tmp_file("Testing")?;
// Now the tempdir has been created on demand
assert_eq!(1, local_dir_snapshot(&dm).len());
@@ -192,7 +197,7 @@ mod tests {
);
let dm = DiskManager::try_new(config)?;
- let actual = dm.create_tmp_file()?;
+ let actual = dm.create_tmp_file("Testing")?;
// the file should be in one of the specified local directories
assert_path_in_dirs(actual.path(), local_dirs.into_iter());
@@ -204,10 +209,9 @@ mod tests {
fn test_disabled_disk_manager() {
let config = DiskManagerConfig::Disabled;
let manager = DiskManager::try_new(config).unwrap();
- let e = manager.create_tmp_file().unwrap_err().to_string();
assert_eq!(
- e,
- "Resources exhausted: Cannot spill to temporary file as
DiskManager is disabled"
+ manager.create_tmp_file("Testing").unwrap_err().to_string(),
+ "Resources exhausted: Memory Exhausted while Testing (DiskManager
is disabled)",
)
}
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs
b/datafusion/core/src/physical_plan/sorts/sort.rs
index 0ea2c5c5f..0dfac7876 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -291,7 +291,7 @@ impl MemoryConsumer for ExternalSorter {
.metrics_set
.new_intermediate_tracking(partition, self.runtime.clone());
- let spillfile = self.runtime.disk_manager.create_tmp_file()?;
+ let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?;
let stream = in_mem_partial_sort(
&mut in_mem_batches,
self.schema.clone(),