This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 284514308a Fix a race condition issue on reading spilled file (#7538)
284514308a is described below

commit 284514308a6ea5fb2beb582fcd888e2859871f53
Author: Kousuke Saruta <[email protected]>
AuthorDate: Fri Sep 15 03:45:02 2023 +0900

    Fix a race condition issue on reading spilled file (#7538)
    
    * Fix the race condition issue
    
    * Fix format
    
    * Make tempdir live long enough
    
    * Improve comment
    
    * Fix doc
    
    * More comment improvement
---
 datafusion/core/src/physical_plan/sorts/sort.rs |  6 +-
 datafusion/execution/src/disk_manager.rs        | 81 ++++++++++++++++++++++---
 2 files changed, 76 insertions(+), 11 deletions(-)

diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs 
b/datafusion/core/src/physical_plan/sorts/sort.rs
index 14deb637ac..17b94d51c5 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -36,6 +36,7 @@ use arrow::datatypes::SchemaRef;
 use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
 use datafusion_common::{exec_err, plan_err, DataFusionError, Result};
+use datafusion_execution::disk_manager::RefCountedTempFile;
 use datafusion_execution::memory_pool::{
     human_readable_size, MemoryConsumer, MemoryReservation,
 };
@@ -51,7 +52,6 @@ use std::fs::File;
 use std::io::BufReader;
 use std::path::{Path, PathBuf};
 use std::sync::Arc;
-use tempfile::NamedTempFile;
 use tokio::sync::mpsc::Sender;
 use tokio::task;
 
@@ -202,7 +202,7 @@ struct ExternalSorter {
     in_mem_batches_sorted: bool,
     /// If data has previously been spilled, the locations of the
     /// spill files (in Arrow IPC format)
-    spills: Vec<NamedTempFile>,
+    spills: Vec<RefCountedTempFile>,
     /// Sort expressions
     expr: Arc<[PhysicalSortExpr]>,
     /// Runtime metrics
@@ -609,7 +609,7 @@ async fn spill_sorted_batches(
 }
 
 fn read_spill_as_stream(
-    path: NamedTempFile,
+    path: RefCountedTempFile,
     schema: SchemaRef,
 ) -> Result<SendableRecordBatchStream> {
     let mut builder = RecordBatchReceiverStream::builder(schema, 2);
diff --git a/datafusion/execution/src/disk_manager.rs 
b/datafusion/execution/src/disk_manager.rs
index ecae698523..fa9a75b2f4 100644
--- a/datafusion/execution/src/disk_manager.rs
+++ b/datafusion/execution/src/disk_manager.rs
@@ -22,7 +22,7 @@ use datafusion_common::{DataFusionError, Result};
 use log::debug;
 use parking_lot::Mutex;
 use rand::{thread_rng, Rng};
-use std::path::PathBuf;
+use std::path::{Path, PathBuf};
 use std::sync::Arc;
 use tempfile::{Builder, NamedTempFile, TempDir};
 
@@ -75,7 +75,7 @@ pub struct DiskManager {
     ///
     /// If `Some(vec![])` a new OS specified temporary directory will be 
created
     /// If `None` an error will be returned (configured not to spill)
-    local_dirs: Mutex<Option<Vec<TempDir>>>,
+    local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
 }
 
 impl DiskManager {
@@ -113,7 +113,10 @@ impl DiskManager {
     ///
     /// If the file can not be created for some reason, returns an
     /// error message referencing the request description
-    pub fn create_tmp_file(&self, request_description: &str) -> 
Result<NamedTempFile> {
+    pub fn create_tmp_file(
+        &self,
+        request_description: &str,
+    ) -> Result<RefCountedTempFile> {
         let mut guard = self.local_dirs.lock();
         let local_dirs = guard.as_mut().ok_or_else(|| {
             DataFusionError::ResourcesExhausted(format!(
@@ -131,18 +134,42 @@ impl DiskManager {
                 request_description,
             );
 
-            local_dirs.push(tempdir);
+            local_dirs.push(Arc::new(tempdir));
         }
 
         let dir_index = thread_rng().gen_range(0..local_dirs.len());
-        Builder::new()
-            .tempfile_in(&local_dirs[dir_index])
-            .map_err(DataFusionError::IoError)
+        Ok(RefCountedTempFile {
+            parent_temp_dir: local_dirs[dir_index].clone(),
+            tempfile: Builder::new()
+                .tempfile_in(local_dirs[dir_index].as_ref())
+                .map_err(DataFusionError::IoError)?,
+        })
+    }
+}
+
+/// A wrapper around a [`NamedTempFile`] that also contains
+/// a reference to its parent temporary directory
+#[derive(Debug)]
+pub struct RefCountedTempFile {
+    /// The reference to the directory in which temporary files are created to 
ensure
+    /// it is not cleaned up prior to the NamedTempFile
+    #[allow(dead_code)]
+    parent_temp_dir: Arc<TempDir>,
+    tempfile: NamedTempFile,
+}
+
+impl RefCountedTempFile {
+    pub fn path(&self) -> &Path {
+        self.tempfile.path()
+    }
+
+    pub fn inner(&self) -> &NamedTempFile {
+        &self.tempfile
     }
 }
 
 /// Setup local dirs by creating one new dir in each of the given dirs
-fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<TempDir>> {
+fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<Arc<TempDir>>> {
     local_dirs
         .iter()
         .map(|root| {
@@ -154,6 +181,7 @@ fn create_local_dirs(local_dirs: Vec<PathBuf>) -> 
Result<Vec<TempDir>> {
                 .tempdir_in(root)
                 .map_err(DataFusionError::IoError)
         })
+        .map(|result| result.map(Arc::new))
         .collect()
 }
 
@@ -250,4 +278,41 @@ mod tests {
 
         assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
     }
+
+    #[test]
+    fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
+        // Test for the case using OS arranged temporary directory
+        let config = DiskManagerConfig::new();
+        let dm = DiskManager::try_new(config)?;
+        let temp_file = dm.create_tmp_file("Testing")?;
+        let temp_file_path = temp_file.path().to_owned();
+        assert!(temp_file_path.exists());
+
+        drop(dm);
+        assert!(temp_file_path.exists());
+
+        drop(temp_file);
+        assert!(!temp_file_path.exists());
+
+        // Test for the case using specified directories
+        let local_dir1 = TempDir::new()?;
+        let local_dir2 = TempDir::new()?;
+        let local_dir3 = TempDir::new()?;
+        let local_dirs = [local_dir1.path(), local_dir2.path(), 
local_dir3.path()];
+        let config = DiskManagerConfig::new_specified(
+            local_dirs.iter().map(|p| p.into()).collect(),
+        );
+        let dm = DiskManager::try_new(config)?;
+        let temp_file = dm.create_tmp_file("Testing")?;
+        let temp_file_path = temp_file.path().to_owned();
+        assert!(temp_file_path.exists());
+
+        drop(dm);
+        assert!(temp_file_path.exists());
+
+        drop(temp_file);
+        assert!(!temp_file_path.exists());
+
+        Ok(())
+    }
 }

Reply via email to