This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new df8aa7a2e Add ability to disable DiskManager (#4330)
df8aa7a2e is described below
commit df8aa7a2e2a6f54acfbfed336b84144256fb7ff8
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Nov 23 21:36:13 2022 +0000
Add ability to disable DiskManager (#4330)
---
datafusion/core/src/execution/disk_manager.rs | 53 ++++++++++++++++++---------
1 file changed, 35 insertions(+), 18 deletions(-)
diff --git a/datafusion/core/src/execution/disk_manager.rs
b/datafusion/core/src/execution/disk_manager.rs
index c4fe6b416..80d594341 100644
--- a/datafusion/core/src/execution/disk_manager.rs
+++ b/datafusion/core/src/execution/disk_manager.rs
@@ -39,6 +39,9 @@ pub enum DiskManagerConfig {
/// Create a new [DiskManager] that creates temporary files within
/// the specified directories
NewSpecified(Vec<PathBuf>),
+
+ /// Disable disk manager, attempts to create temporary files will error
+ Disabled,
}
impl Default for DiskManagerConfig {
@@ -68,9 +71,11 @@ impl DiskManagerConfig {
/// while processing dataset larger than available memory.
#[derive(Debug)]
pub struct DiskManager {
- /// TempDirs to put temporary files in. A new OS specified
- /// temporary directory will be created if this list is empty.
- local_dirs: Mutex<Vec<TempDir>>,
+ /// TempDirs to put temporary files in.
+ ///
+ /// If `Some(vec![])` a new OS specified temporary directory will be
created
+ /// If `None` an error will be returned
+ local_dirs: Mutex<Option<Vec<TempDir>>>,
}
impl DiskManager {
@@ -79,7 +84,7 @@ impl DiskManager {
match config {
DiskManagerConfig::Existing(manager) => Ok(manager),
DiskManagerConfig::NewOs => Ok(Arc::new(Self {
- local_dirs: Mutex::new(vec![]),
+ local_dirs: Mutex::new(Some(vec![])),
})),
DiskManagerConfig::NewSpecified(conf_dirs) => {
let local_dirs = create_local_dirs(conf_dirs)?;
@@ -88,15 +93,23 @@ impl DiskManager {
local_dirs
);
Ok(Arc::new(Self {
- local_dirs: Mutex::new(local_dirs),
+ local_dirs: Mutex::new(Some(local_dirs)),
}))
}
+ DiskManagerConfig::Disabled => Ok(Arc::new(Self {
+ local_dirs: Mutex::new(None),
+ })),
}
}
/// Return a temporary file from a randomized choice in the configured
locations
pub fn create_tmp_file(&self) -> Result<NamedTempFile> {
- let mut local_dirs = self.local_dirs.lock();
+ let mut guard = self.local_dirs.lock();
+ let local_dirs = guard.as_mut().ok_or_else(|| {
+ DataFusionError::ResourcesExhausted(
+ "Cannot spill to temporary file as DiskManager is
disabled".to_string(),
+ )
+ })?;
// Create a temporary directory if needed
if local_dirs.is_empty() {
@@ -110,7 +123,10 @@ impl DiskManager {
local_dirs.push(tempdir);
}
- create_tmp_file(&local_dirs)
+ let dir_index = thread_rng().gen_range(0..local_dirs.len());
+ Builder::new()
+ .tempfile_in(&local_dirs[dir_index])
+ .map_err(DataFusionError::IoError)
}
}
@@ -127,17 +143,6 @@ fn create_local_dirs(local_dirs: Vec<PathBuf>) ->
Result<Vec<TempDir>> {
.collect()
}
-fn create_tmp_file(local_dirs: &[TempDir]) -> Result<NamedTempFile> {
- let dir_index = thread_rng().gen_range(0..local_dirs.len());
- let dir = local_dirs.get(dir_index).ok_or_else(|| {
- DataFusionError::Internal("No directories available to
DiskManager".into())
- })?;
-
- Builder::new()
- .tempfile_in(dir)
- .map_err(DataFusionError::IoError)
-}
-
#[cfg(test)]
mod tests {
use std::path::Path;
@@ -171,6 +176,7 @@ mod tests {
dm.local_dirs
.lock()
.iter()
+ .flatten()
.map(|p| p.path().into())
.collect()
}
@@ -194,6 +200,17 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_disabled_disk_manager() {
+ let config = DiskManagerConfig::Disabled;
+ let manager = DiskManager::try_new(config).unwrap();
+ let e = manager.create_tmp_file().unwrap_err().to_string();
+ assert_eq!(
+ e,
+ "Resources exhausted: Cannot spill to temporary file as
DiskManager is disabled"
+ )
+ }
+
/// Asserts that `file_path` is found anywhere in any of `dir` directories
fn assert_path_in_dirs<'a>(
file_path: &'a Path,