kosiew commented on code in PR #22246:
URL: https://github.com/apache/datafusion/pull/22246#discussion_r3286523260


##########
datafusion/execution/src/disk_manager.rs:
##########
@@ -210,53 +211,76 @@ impl DiskManager {
                 );
                 Ok(Arc::new(Self {
                     local_dirs: Mutex::new(Some(local_dirs)),
-                    max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+                    max_temp_directory_size: AtomicU64::new(
+                        DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+                    ),
                     used_disk_space: Arc::new(AtomicU64::new(0)),
                     active_files_count: Arc::new(AtomicUsize::new(0)),
                 }))
             }
             DiskManagerConfig::Disabled => Ok(Arc::new(Self {
                 local_dirs: Mutex::new(None),
-                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+                max_temp_directory_size: 
AtomicU64::new(DEFAULT_MAX_TEMP_DIRECTORY_SIZE),
                 used_disk_space: Arc::new(AtomicU64::new(0)),
                 active_files_count: Arc::new(AtomicUsize::new(0)),
             })),
         }
     }
 
+    /// Set the max temp directory size. Requires exclusive access.
+    ///
+    /// Prefer [`Self::update_max_temp_directory_size`] which takes `&self` and
+    /// works through `Arc` without exclusive access.
+    #[deprecated(
+        since = "54.0.0",
+        note = "Use `update_max_temp_directory_size` instead, which takes 
&self and works through Arc."
+    )]
     pub fn set_max_temp_directory_size(

Review Comment:
   `set_max_temp_directory_size` still takes `&mut self` and is deprecated, but 
the PR contract says this setter should become dynamically adjustable through 
shared access.
   
   As written, code that has the production `Arc<DiskManager>` still cannot 
call `disk_manager.set_max_temp_directory_size(...)`; it has to know about and 
use the new `update_*` method instead. Could you please change this public 
setter to take `&self` and do the atomic store directly?
   
   It would also be good to add a regression test that calls 
`Arc<DiskManager>::set_max_temp_directory_size` through shared access.



##########
datafusion/execution/src/disk_manager.rs:
##########
@@ -418,11 +442,15 @@ impl RefCountedTempFile {
 
         // 3. Check if the updated global disk usage exceeds the configured 
limit
         let global_disk_usage = 
self.disk_manager.used_disk_space.load(Ordering::Relaxed);
-        if global_disk_usage > self.disk_manager.max_temp_directory_size {
+        let limit = self

Review Comment:
   I think lowering the limit below current usage can leave `used_disk_space` 
permanently inflated after a failed spill update.
   
   `update_disk_usage` subtracts the old size, adds the new size, then returns 
an error when `global_disk_usage > limit` before storing 
`current_file_disk_usage = new_disk_usage`. If that file is later dropped, 
`Drop` subtracts only the old value, not the new value that was already added 
to the global counter.
   
   That breaks the stated invariant that usage naturally drops as existing 
spill files are released. Could you please either roll back the global counter 
on error or record the new per-file size before returning?
   
   Please also add a real temp-file test that lowers the limit, attempts 
`update_disk_usage`, drops the file, and verifies `used_disk_space()` returns 
to zero.



##########
datafusion/execution/src/disk_manager.rs:
##########
@@ -210,53 +211,76 @@ impl DiskManager {
                 );
                 Ok(Arc::new(Self {
                     local_dirs: Mutex::new(Some(local_dirs)),
-                    max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+                    max_temp_directory_size: AtomicU64::new(
+                        DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+                    ),
                     used_disk_space: Arc::new(AtomicU64::new(0)),
                     active_files_count: Arc::new(AtomicUsize::new(0)),
                 }))
             }
             DiskManagerConfig::Disabled => Ok(Arc::new(Self {
                 local_dirs: Mutex::new(None),
-                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+                max_temp_directory_size: 
AtomicU64::new(DEFAULT_MAX_TEMP_DIRECTORY_SIZE),
                 used_disk_space: Arc::new(AtomicU64::new(0)),
                 active_files_count: Arc::new(AtomicUsize::new(0)),
             })),
         }
     }
 
+    /// Set the max temp directory size. Requires exclusive access.
+    ///
+    /// Prefer [`Self::update_max_temp_directory_size`] which takes `&self` and
+    /// works through `Arc` without exclusive access.
+    #[deprecated(
+        since = "54.0.0",
+        note = "Use `update_max_temp_directory_size` instead, which takes 
&self and works through Arc."
+    )]
     pub fn set_max_temp_directory_size(
         &mut self,
         max_temp_directory_size: u64,
     ) -> Result<()> {
-        // If the disk manager is disabled and `max_temp_directory_size` is 
not 0,
-        // this operation is not meaningful, fail early.
+        self.update_max_temp_directory_size(max_temp_directory_size)
+    }
+
+    /// Atomically update the max temp directory size at runtime.
+    ///
+    /// Takes `&self` (not `&mut self`), so it works through `Arc<DiskManager>`
+    /// without requiring exclusive access. Takes effect immediately for
+    /// subsequent spill writes.
+    ///
+    /// Use this when you need to adjust the limit dynamically while queries
+    /// are running (e.g., adapting to available disk space).
+    pub fn update_max_temp_directory_size(
+        &self,
+        max_temp_directory_size: u64,
+    ) -> Result<()> {
         if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
             return config_err!(
                 "Cannot set max temp directory size for a disk manager that 
spilling is disabled"
             );
         }
 
-        self.max_temp_directory_size = max_temp_directory_size;
+        self.max_temp_directory_size

Review Comment:
   Small suggestion: `Ordering::Relaxed` may be enough for this limit 
store/load unless there is a specific cross-memory synchronization invariant 
here.
   
   The limit looks like a standalone atomic value, so Acquire/Release may 
suggest ordering guarantees that the surrounding code does not seem to rely on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to