This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new fecce97 Lazy TempDir creation in DiskManager (#1695)
fecce97 is described below
commit fecce97b519cbdaa16c9974af58bb12d9d73d327
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Jan 30 08:18:09 2022 -0500
Lazy TempDir creation in DiskManager (#1695)
---
datafusion/src/execution/disk_manager.rs | 99 +++++++++++++++++++++++---------
1 file changed, 73 insertions(+), 26 deletions(-)
diff --git a/datafusion/src/execution/disk_manager.rs
b/datafusion/src/execution/disk_manager.rs
index 31565fe..4486f53 100644
--- a/datafusion/src/execution/disk_manager.rs
+++ b/datafusion/src/execution/disk_manager.rs
@@ -21,8 +21,8 @@
use crate::error::{DataFusionError, Result};
use log::debug;
use rand::{thread_rng, Rng};
-use std::path::PathBuf;
use std::sync::Arc;
+use std::{path::PathBuf, sync::Mutex};
use tempfile::{Builder, NamedTempFile, TempDir};
/// Configuration for temporary disk access
@@ -67,7 +67,9 @@ impl DiskManagerConfig {
/// while processing dataset larger than available memory.
#[derive(Debug)]
pub struct DiskManager {
- local_dirs: Vec<TempDir>,
+ /// TempDirs to put temporary files in. A new OS specified
+ /// temporary directory will be created if this list is empty.
+ local_dirs: Mutex<Vec<TempDir>>,
}
impl DiskManager {
@@ -75,31 +77,39 @@ impl DiskManager {
pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
match config {
DiskManagerConfig::Existing(manager) => Ok(manager),
- DiskManagerConfig::NewOs => {
- let tempdir =
tempfile::tempdir().map_err(DataFusionError::IoError)?;
-
- debug!(
- "Created directory {:?} as DataFusion working directory",
- tempdir
- );
- Ok(Arc::new(Self {
- local_dirs: vec![tempdir],
- }))
- }
+ DiskManagerConfig::NewOs => Ok(Arc::new(Self {
+ local_dirs: Mutex::new(vec![]),
+ })),
DiskManagerConfig::NewSpecified(conf_dirs) => {
let local_dirs = create_local_dirs(conf_dirs)?;
debug!(
"Created local dirs {:?} as DataFusion working directory",
local_dirs
);
- Ok(Arc::new(Self { local_dirs }))
+ Ok(Arc::new(Self {
+ local_dirs: Mutex::new(local_dirs),
+ }))
}
}
}
/// Return a temporary file from a randomized choice in the configured
locations
pub fn create_tmp_file(&self) -> Result<NamedTempFile> {
- create_tmp_file(&self.local_dirs)
+ let mut local_dirs = self.local_dirs.lock().unwrap();
+
+ // Create a temporary directory if needed
+ if local_dirs.is_empty() {
+ let tempdir =
tempfile::tempdir().map_err(DataFusionError::IoError)?;
+
+ debug!(
+ "Created directory '{:?}' as DataFusion tempfile directory",
+ tempdir.path().to_string_lossy()
+ );
+
+ local_dirs.push(tempdir);
+ }
+
+ create_tmp_file(&local_dirs)
}
}
@@ -129,11 +139,43 @@ fn create_tmp_file(local_dirs: &[TempDir]) ->
Result<NamedTempFile> {
#[cfg(test)]
mod tests {
+ use std::path::Path;
+
use super::*;
use crate::error::Result;
use tempfile::TempDir;
#[test]
+ fn lazy_temp_dir_creation() -> Result<()> {
+ // A default configuration should not create temp files until requested
+ let config = DiskManagerConfig::new();
+ let dm = DiskManager::try_new(config)?;
+
+ assert_eq!(0, local_dir_snapshot(&dm).len());
+
+ // can still create a tempfile however:
+ let actual = dm.create_tmp_file()?;
+
+ // Now the tempdir has been created on demand
+ assert_eq!(1, local_dir_snapshot(&dm).len());
+
+ // the returned tempfile file should be in the temp directory
+ let local_dirs = local_dir_snapshot(&dm);
+ assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p|
p.as_path()));
+
+ Ok(())
+ }
+
+ fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
+ dm.local_dirs
+ .lock()
+ .unwrap()
+ .iter()
+ .map(|p| p.path().into())
+ .collect()
+ }
+
+ #[test]
fn file_in_right_dir() -> Result<()> {
let local_dir1 = TempDir::new()?;
let local_dir2 = TempDir::new()?;
@@ -147,19 +189,24 @@ mod tests {
let actual = dm.create_tmp_file()?;
// the file should be in one of the specified local directories
- let found = local_dirs.iter().any(|p| {
- actual
- .path()
+ assert_path_in_dirs(actual.path(), local_dirs.into_iter());
+
+ Ok(())
+ }
+
+ /// Asserts that `file_path` is found anywhere in any of `dir` directories
+ fn assert_path_in_dirs<'a>(
+ file_path: &'a Path,
+ dirs: impl Iterator<Item = &'a Path>,
+ ) {
+ let dirs: Vec<&Path> = dirs.collect();
+
+ let found = dirs.iter().any(|file_path| {
+ file_path
.ancestors()
- .any(|candidate_path| *p == candidate_path)
+ .any(|candidate_path| *file_path == candidate_path)
});
- assert!(
- found,
- "Can't find {:?} in specified local dirs: {:?}",
- actual, local_dirs
- );
-
- Ok(())
+ assert!(found, "Can't find {:?} in dirs: {:?}", file_path, dirs);
}
}