This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d65394f2 Improve error messages when memory is exhausted while 
sorting (#4348)
0d65394f2 is described below

commit 0d65394f2d4b36938e549cf1686cef1c51813eb5
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Nov 25 13:37:49 2022 -0500

    Improve error messages when memory is exhausted while sorting (#4348)
---
 datafusion/core/src/execution/disk_manager.rs   | 28 ++++++++++++++-----------
 datafusion/core/src/physical_plan/sorts/sort.rs |  2 +-
 2 files changed, 17 insertions(+), 13 deletions(-)

diff --git a/datafusion/core/src/execution/disk_manager.rs 
b/datafusion/core/src/execution/disk_manager.rs
index 80d594341..b8e5fac3b 100644
--- a/datafusion/core/src/execution/disk_manager.rs
+++ b/datafusion/core/src/execution/disk_manager.rs
@@ -74,7 +74,7 @@ pub struct DiskManager {
     /// TempDirs to put temporary files in.
     ///
     /// If `Some(vec![])` a new OS specified temporary directory will be 
created
-    /// If `None` an error will be returned
+    /// If `None` an error will be returned (configured not to spill)
     local_dirs: Mutex<Option<Vec<TempDir>>>,
 }
 
@@ -103,12 +103,16 @@ impl DiskManager {
     }
 
     /// Return a temporary file from a randomized choice in the configured 
locations
-    pub fn create_tmp_file(&self) -> Result<NamedTempFile> {
+    ///
+    /// If the file can not be created for some reason, returns an
+    /// error message referencing the request description
+    pub fn create_tmp_file(&self, request_description: &str) -> 
Result<NamedTempFile> {
         let mut guard = self.local_dirs.lock();
         let local_dirs = guard.as_mut().ok_or_else(|| {
-            DataFusionError::ResourcesExhausted(
-                "Cannot spill to temporary file as DiskManager is 
disabled".to_string(),
-            )
+            DataFusionError::ResourcesExhausted(format!(
+                "Memory Exhausted while {} (DiskManager is disabled)",
+                request_description
+            ))
         })?;
 
         // Create a temporary directory if needed
@@ -116,8 +120,9 @@ impl DiskManager {
             let tempdir = 
tempfile::tempdir().map_err(DataFusionError::IoError)?;
 
             debug!(
-                "Created directory '{:?}' as DataFusion tempfile directory",
-                tempdir.path().to_string_lossy()
+                "Created directory '{:?}' as DataFusion tempfile directory for 
{}",
+                tempdir.path().to_string_lossy(),
+                request_description,
             );
 
             local_dirs.push(tempdir);
@@ -160,7 +165,7 @@ mod tests {
         assert_eq!(0, local_dir_snapshot(&dm).len());
 
         // can still create a tempfile however:
-        let actual = dm.create_tmp_file()?;
+        let actual = dm.create_tmp_file("Testing")?;
 
         // Now the tempdir has been created on demand
         assert_eq!(1, local_dir_snapshot(&dm).len());
@@ -192,7 +197,7 @@ mod tests {
         );
 
         let dm = DiskManager::try_new(config)?;
-        let actual = dm.create_tmp_file()?;
+        let actual = dm.create_tmp_file("Testing")?;
 
         // the file should be in one of the specified local directories
         assert_path_in_dirs(actual.path(), local_dirs.into_iter());
@@ -204,10 +209,9 @@ mod tests {
     fn test_disabled_disk_manager() {
         let config = DiskManagerConfig::Disabled;
         let manager = DiskManager::try_new(config).unwrap();
-        let e = manager.create_tmp_file().unwrap_err().to_string();
         assert_eq!(
-            e,
-            "Resources exhausted: Cannot spill to temporary file as 
DiskManager is disabled"
+            manager.create_tmp_file("Testing").unwrap_err().to_string(),
+            "Resources exhausted: Memory Exhausted while Testing (DiskManager 
is disabled)",
         )
     }
 
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs 
b/datafusion/core/src/physical_plan/sorts/sort.rs
index 0ea2c5c5f..0dfac7876 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -291,7 +291,7 @@ impl MemoryConsumer for ExternalSorter {
             .metrics_set
             .new_intermediate_tracking(partition, self.runtime.clone());
 
-        let spillfile = self.runtime.disk_manager.create_tmp_file()?;
+        let spillfile = self.runtime.disk_manager.create_tmp_file("Sorting")?;
         let stream = in_mem_partial_sort(
             &mut in_mem_batches,
             self.schema.clone(),

Reply via email to