adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286494820


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -319,19 +342,84 @@ impl TopK {
     /// (a > 2 OR (a = 2 AND b < 3))
     /// ```
     fn update_filter(&mut self) -> Result<()> {
-        let Some(filter) = &self.filter else {
+        // If the heap doesn't have k elements yet, we can't create thresholds
+        let Some(max_row) = self.heap.max() else {
             return Ok(());
         };
-        let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? 
else {
-            return Ok(());
+
+        let new_threshold_row = &max_row.row;
+
+        // Extract scalar values BEFORE acquiring lock to reduce critical 
section
+        let thresholds = match self.heap.get_threshold_values(&self.expr)? {
+            Some(t) => t,
+            None => return Ok(()),
         };
 
+        // Extract filter expression reference before entering critical section
+        let filter_expr = Arc::clone(&self.filter.expr);
+
+        // Check if we need to update the threshold (lock-free read)
+        let current_threshold = self.filter.threshold_row.load();
+        let needs_update = match current_threshold.as_ref() {
+            Some(current_row) => {
+                // new < current means new threshold is more selective
+                current_row.as_slice().cmp(new_threshold_row) == 
Ordering::Greater
+            }
+            None => true, // No current threshold, so we need to set one
+        };
+
+        // Only proceed if we need to update
+        if needs_update {
+            // Build the filter expression OUTSIDE any synchronization
+            let predicate = Self::build_filter_expression(&self.expr, 
thresholds)?;
+            let new_threshold_arc = Arc::new(new_threshold_row.to_vec());
+
+            // Atomically update the threshold using compare-and-swap
+            let old_threshold = self.filter.threshold_row.compare_and_swap(
+                &current_threshold,
+                Some(Arc::clone(&new_threshold_arc)),
+            );
+
+            // Only update filter if we successfully updated the threshold
+            // (or if there was no previous threshold and we're the first)
+            let should_update_filter =
+                match (old_threshold.as_ref(), current_threshold.as_ref()) {
+                    // We successfully swapped
+                    (Some(old), Some(expected)) if Arc::ptr_eq(old, expected) 
=> true,
+                    // We were the first to set it
+                    (None, None) => true,
+                    // Another thread updated before us, check if our 
threshold is still better
+                    (Some(actual_old), _) => {
+                        actual_old.as_slice().cmp(new_threshold_row) == 
Ordering::Greater

Review Comment:
   I think you resolved this in your commits, just confirming



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to