This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 284514308a Fix a race condition issue on reading spilled file (#7538)
284514308a is described below
commit 284514308a6ea5fb2beb582fcd888e2859871f53
Author: Kousuke Saruta <[email protected]>
AuthorDate: Fri Sep 15 03:45:02 2023 +0900
Fix a race condition issue on reading spilled file (#7538)
* Fix the race condition issue
* Fix format
* Make tempdir live long enough
* Improve comment
* Fix doc
* More comment improvement
---
datafusion/core/src/physical_plan/sorts/sort.rs | 6 +-
datafusion/execution/src/disk_manager.rs | 81 ++++++++++++++++++++++---
2 files changed, 76 insertions(+), 11 deletions(-)
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs
b/datafusion/core/src/physical_plan/sorts/sort.rs
index 14deb637ac..17b94d51c5 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -36,6 +36,7 @@ use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use datafusion_common::{exec_err, plan_err, DataFusionError, Result};
+use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::{
human_readable_size, MemoryConsumer, MemoryReservation,
};
@@ -51,7 +52,6 @@ use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::sync::Arc;
-use tempfile::NamedTempFile;
use tokio::sync::mpsc::Sender;
use tokio::task;
@@ -202,7 +202,7 @@ struct ExternalSorter {
in_mem_batches_sorted: bool,
/// If data has previously been spilled, the locations of the
/// spill files (in Arrow IPC format)
- spills: Vec<NamedTempFile>,
+ spills: Vec<RefCountedTempFile>,
/// Sort expressions
expr: Arc<[PhysicalSortExpr]>,
/// Runtime metrics
@@ -609,7 +609,7 @@ async fn spill_sorted_batches(
}
fn read_spill_as_stream(
- path: NamedTempFile,
+ path: RefCountedTempFile,
schema: SchemaRef,
) -> Result<SendableRecordBatchStream> {
let mut builder = RecordBatchReceiverStream::builder(schema, 2);
diff --git a/datafusion/execution/src/disk_manager.rs
b/datafusion/execution/src/disk_manager.rs
index ecae698523..fa9a75b2f4 100644
--- a/datafusion/execution/src/disk_manager.rs
+++ b/datafusion/execution/src/disk_manager.rs
@@ -22,7 +22,7 @@ use datafusion_common::{DataFusionError, Result};
use log::debug;
use parking_lot::Mutex;
use rand::{thread_rng, Rng};
-use std::path::PathBuf;
+use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::{Builder, NamedTempFile, TempDir};
@@ -75,7 +75,7 @@ pub struct DiskManager {
///
/// If `Some(vec![])` a new OS specified temporary directory will be
created
/// If `None` an error will be returned (configured not to spill)
- local_dirs: Mutex<Option<Vec<TempDir>>>,
+ local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
}
impl DiskManager {
@@ -113,7 +113,10 @@ impl DiskManager {
///
/// If the file can not be created for some reason, returns an
/// error message referencing the request description
- pub fn create_tmp_file(&self, request_description: &str) ->
Result<NamedTempFile> {
+ pub fn create_tmp_file(
+ &self,
+ request_description: &str,
+ ) -> Result<RefCountedTempFile> {
let mut guard = self.local_dirs.lock();
let local_dirs = guard.as_mut().ok_or_else(|| {
DataFusionError::ResourcesExhausted(format!(
@@ -131,18 +134,42 @@ impl DiskManager {
request_description,
);
- local_dirs.push(tempdir);
+ local_dirs.push(Arc::new(tempdir));
}
let dir_index = thread_rng().gen_range(0..local_dirs.len());
- Builder::new()
- .tempfile_in(&local_dirs[dir_index])
- .map_err(DataFusionError::IoError)
+ Ok(RefCountedTempFile {
+ parent_temp_dir: local_dirs[dir_index].clone(),
+ tempfile: Builder::new()
+ .tempfile_in(local_dirs[dir_index].as_ref())
+ .map_err(DataFusionError::IoError)?,
+ })
+ }
+}
+
+/// A wrapper around a [`NamedTempFile`] that also contains
+/// a reference to its parent temporary directory
+#[derive(Debug)]
+pub struct RefCountedTempFile {
+ /// The reference to the directory in which temporary files are created to
ensure
+ /// it is not cleaned up prior to the NamedTempFile
+ #[allow(dead_code)]
+ parent_temp_dir: Arc<TempDir>,
+ tempfile: NamedTempFile,
+}
+
+impl RefCountedTempFile {
+ pub fn path(&self) -> &Path {
+ self.tempfile.path()
+ }
+
+ pub fn inner(&self) -> &NamedTempFile {
+ &self.tempfile
}
}
/// Setup local dirs by creating one new dir in each of the given dirs
-fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<TempDir>> {
+fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<Arc<TempDir>>> {
local_dirs
.iter()
.map(|root| {
@@ -154,6 +181,7 @@ fn create_local_dirs(local_dirs: Vec<PathBuf>) ->
Result<Vec<TempDir>> {
.tempdir_in(root)
.map_err(DataFusionError::IoError)
})
+ .map(|result| result.map(Arc::new))
.collect()
}
@@ -250,4 +278,41 @@ mod tests {
assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
}
+
+ #[test]
+ fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
+ // Test for the case using OS arranged temporary directory
+ let config = DiskManagerConfig::new();
+ let dm = DiskManager::try_new(config)?;
+ let temp_file = dm.create_tmp_file("Testing")?;
+ let temp_file_path = temp_file.path().to_owned();
+ assert!(temp_file_path.exists());
+
+ drop(dm);
+ assert!(temp_file_path.exists());
+
+ drop(temp_file);
+ assert!(!temp_file_path.exists());
+
+ // Test for the case using specified directories
+ let local_dir1 = TempDir::new()?;
+ let local_dir2 = TempDir::new()?;
+ let local_dir3 = TempDir::new()?;
+ let local_dirs = [local_dir1.path(), local_dir2.path(),
local_dir3.path()];
+ let config = DiskManagerConfig::new_specified(
+ local_dirs.iter().map(|p| p.into()).collect(),
+ );
+ let dm = DiskManager::try_new(config)?;
+ let temp_file = dm.create_tmp_file("Testing")?;
+ let temp_file_path = temp_file.path().to_owned();
+ assert!(temp_file_path.exists());
+
+ drop(dm);
+ assert!(temp_file_path.exists());
+
+ drop(temp_file);
+ assert!(!temp_file_path.exists());
+
+ Ok(())
+ }
}