xanderbailey commented on code in PR #20672:
URL: https://github.com/apache/datafusion/pull/20672#discussion_r2886623707


##########
datafusion/physical-plan/src/spill/spill_pool.rs:
##########
@@ -93,6 +89,37 @@ impl SpillPoolShared {
     }
 }
 
+/// Tracks the number of live [`SpillPoolWriter`] clones.
+///
+/// Cloning increments the count; dropping decrements it.
+/// [`WriterCount::is_last`] returns `true` when called from the final clone,
+/// which the writer uses to decide whether to finalize the spill pool.
+struct WriterCount(Arc<AtomicUsize>);
+
+impl WriterCount {
+    fn new() -> Self {
+        Self(Arc::new(AtomicUsize::new(1)))
+    }
+
+    /// Returns `true` if this is the only remaining clone.
+    fn is_last(&self) -> bool {
+        self.0.load(Ordering::Acquire) == 1
+    }
+}
+
+impl Clone for WriterCount {
+    fn clone(&self) -> Self {
+        self.0.fetch_add(1, Ordering::Relaxed);

Review Comment:
   So thread A clones (increment), thread B drops and calls `is_last()`, but 
doesn't see A's increment yet — so B thinks it's the last writer and finalizes 
prematurely. I think today the control flow prevents this from happening 
`setup_input_partitions` clones and passed through a task spawn boundary before 
anything can be dropped.
   
   That being said, this might not always be true so  I think `SeqCst` seems 
safer.



##########
datafusion/physical-plan/src/spill/spill_pool.rs:
##########
@@ -93,6 +89,37 @@ impl SpillPoolShared {
     }
 }
 
+/// Tracks the number of live [`SpillPoolWriter`] clones.
+///
+/// Cloning increments the count; dropping decrements it.
+/// [`WriterCount::is_last`] returns `true` when called from the final clone,
+/// which the writer uses to decide whether to finalize the spill pool.
+struct WriterCount(Arc<AtomicUsize>);
+
+impl WriterCount {
+    fn new() -> Self {
+        Self(Arc::new(AtomicUsize::new(1)))
+    }
+
+    /// Returns `true` if this is the only remaining clone.
+    fn is_last(&self) -> bool {
+        self.0.load(Ordering::Acquire) == 1
+    }
+}
+
+impl Clone for WriterCount {
+    fn clone(&self) -> Self {
+        self.0.fetch_add(1, Ordering::Relaxed);

Review Comment:
   So thread A clones (increment), thread B drops and calls `is_last()`, but 
doesn't see A's increment yet so B thinks it's the last writer and finalizes 
prematurely. I think today the control flow prevents this from happening 
`setup_input_partitions` clones and passed through a task spawn boundary before 
anything can be dropped.
   
   That being said, this might not always be true so  I think `SeqCst` seems 
safer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to