This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new df8aa7a2e Add ability to disable DiskManager (#4330)
df8aa7a2e is described below

commit df8aa7a2e2a6f54acfbfed336b84144256fb7ff8
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Nov 23 21:36:13 2022 +0000

    Add ability to disable DiskManager (#4330)
---
 datafusion/core/src/execution/disk_manager.rs | 53 ++++++++++++++++++---------
 1 file changed, 35 insertions(+), 18 deletions(-)

diff --git a/datafusion/core/src/execution/disk_manager.rs 
b/datafusion/core/src/execution/disk_manager.rs
index c4fe6b416..80d594341 100644
--- a/datafusion/core/src/execution/disk_manager.rs
+++ b/datafusion/core/src/execution/disk_manager.rs
@@ -39,6 +39,9 @@ pub enum DiskManagerConfig {
     /// Create a new [DiskManager] that creates temporary files within
     /// the specified directories
     NewSpecified(Vec<PathBuf>),
+
+    /// Disable disk manager, attempts to create temporary files will error
+    Disabled,
 }
 
 impl Default for DiskManagerConfig {
@@ -68,9 +71,11 @@ impl DiskManagerConfig {
 /// while processing dataset larger than available memory.
 #[derive(Debug)]
 pub struct DiskManager {
-    /// TempDirs to put temporary files in. A new OS specified
-    /// temporary directory will be created if this list is empty.
-    local_dirs: Mutex<Vec<TempDir>>,
+    /// TempDirs to put temporary files in.
+    ///
+    /// If `Some(vec![])` a new OS specified temporary directory will be 
created
+    /// If `None` an error will be returned
+    local_dirs: Mutex<Option<Vec<TempDir>>>,
 }
 
 impl DiskManager {
@@ -79,7 +84,7 @@ impl DiskManager {
         match config {
             DiskManagerConfig::Existing(manager) => Ok(manager),
             DiskManagerConfig::NewOs => Ok(Arc::new(Self {
-                local_dirs: Mutex::new(vec![]),
+                local_dirs: Mutex::new(Some(vec![])),
             })),
             DiskManagerConfig::NewSpecified(conf_dirs) => {
                 let local_dirs = create_local_dirs(conf_dirs)?;
@@ -88,15 +93,23 @@ impl DiskManager {
                     local_dirs
                 );
                 Ok(Arc::new(Self {
-                    local_dirs: Mutex::new(local_dirs),
+                    local_dirs: Mutex::new(Some(local_dirs)),
                 }))
             }
+            DiskManagerConfig::Disabled => Ok(Arc::new(Self {
+                local_dirs: Mutex::new(None),
+            })),
         }
     }
 
     /// Return a temporary file from a randomized choice in the configured 
locations
     pub fn create_tmp_file(&self) -> Result<NamedTempFile> {
-        let mut local_dirs = self.local_dirs.lock();
+        let mut guard = self.local_dirs.lock();
+        let local_dirs = guard.as_mut().ok_or_else(|| {
+            DataFusionError::ResourcesExhausted(
+                "Cannot spill to temporary file as DiskManager is 
disabled".to_string(),
+            )
+        })?;
 
         // Create a temporary directory if needed
         if local_dirs.is_empty() {
@@ -110,7 +123,10 @@ impl DiskManager {
             local_dirs.push(tempdir);
         }
 
-        create_tmp_file(&local_dirs)
+        let dir_index = thread_rng().gen_range(0..local_dirs.len());
+        Builder::new()
+            .tempfile_in(&local_dirs[dir_index])
+            .map_err(DataFusionError::IoError)
     }
 }
 
@@ -127,17 +143,6 @@ fn create_local_dirs(local_dirs: Vec<PathBuf>) -> 
Result<Vec<TempDir>> {
         .collect()
 }
 
-fn create_tmp_file(local_dirs: &[TempDir]) -> Result<NamedTempFile> {
-    let dir_index = thread_rng().gen_range(0..local_dirs.len());
-    let dir = local_dirs.get(dir_index).ok_or_else(|| {
-        DataFusionError::Internal("No directories available to 
DiskManager".into())
-    })?;
-
-    Builder::new()
-        .tempfile_in(dir)
-        .map_err(DataFusionError::IoError)
-}
-
 #[cfg(test)]
 mod tests {
     use std::path::Path;
@@ -171,6 +176,7 @@ mod tests {
         dm.local_dirs
             .lock()
             .iter()
+            .flatten()
             .map(|p| p.path().into())
             .collect()
     }
@@ -194,6 +200,17 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_disabled_disk_manager() {
+        let config = DiskManagerConfig::Disabled;
+        let manager = DiskManager::try_new(config).unwrap();
+        let e = manager.create_tmp_file().unwrap_err().to_string();
+        assert_eq!(
+            e,
+            "Resources exhausted: Cannot spill to temporary file as 
DiskManager is disabled"
+        )
+    }
+
     /// Asserts that `file_path` is found anywhere in any of `dir` directories
     fn assert_path_in_dirs<'a>(
         file_path: &'a Path,

Reply via email to