xanderbailey commented on code in PR #20672:
URL: https://github.com/apache/datafusion/pull/20672#discussion_r2886623707
##########
datafusion/physical-plan/src/spill/spill_pool.rs:
##########
@@ -93,6 +89,37 @@ impl SpillPoolShared {
}
}
+/// Tracks the number of live [`SpillPoolWriter`] clones.
+///
+/// Cloning increments the count; dropping decrements it.
+/// [`WriterCount::is_last`] returns `true` when called from the final clone,
+/// which the writer uses to decide whether to finalize the spill pool.
+struct WriterCount(Arc<AtomicUsize>);
+
+impl WriterCount {
+ fn new() -> Self {
+ Self(Arc::new(AtomicUsize::new(1)))
+ }
+
+ /// Returns `true` if this is the only remaining clone.
+ fn is_last(&self) -> bool {
+ self.0.load(Ordering::Acquire) == 1
+ }
+}
+
+impl Clone for WriterCount {
+ fn clone(&self) -> Self {
+ self.0.fetch_add(1, Ordering::Relaxed);
Review Comment:
So thread A clones (increment), thread B drops and calls `is_last()`, but
doesn't see A's increment yet — so B thinks it's the last writer and finalizes
prematurely. I think today the control flow prevents this from happening
`setup_input_partitions` clones and passed through a task spawn boundary before
anything can be dropped.
That being said, this might not always be true so I think `SeqCst` seems
safer.
##########
datafusion/physical-plan/src/spill/spill_pool.rs:
##########
@@ -93,6 +89,37 @@ impl SpillPoolShared {
}
}
+/// Tracks the number of live [`SpillPoolWriter`] clones.
+///
+/// Cloning increments the count; dropping decrements it.
+/// [`WriterCount::is_last`] returns `true` when called from the final clone,
+/// which the writer uses to decide whether to finalize the spill pool.
+struct WriterCount(Arc<AtomicUsize>);
+
+impl WriterCount {
+ fn new() -> Self {
+ Self(Arc::new(AtomicUsize::new(1)))
+ }
+
+ /// Returns `true` if this is the only remaining clone.
+ fn is_last(&self) -> bool {
+ self.0.load(Ordering::Acquire) == 1
+ }
+}
+
+impl Clone for WriterCount {
+ fn clone(&self) -> Self {
+ self.0.fetch_add(1, Ordering::Relaxed);
Review Comment:
So thread A clones (increment), thread B drops and calls `is_last()`, but
doesn't see A's increment yet so B thinks it's the last writer and finalizes
prematurely. I think today the control flow prevents this from happening
`setup_input_partitions` clones and passed through a task spawn boundary before
anything can be dropped.
That being said, this might not always be true so I think `SeqCst` seems
safer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]