adriangb commented on code in PR #16433: URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286494820
########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -319,19 +342,84 @@ impl TopK { /// (a > 2 OR (a = 2 AND b < 3)) /// ``` fn update_filter(&mut self) -> Result<()> { - let Some(filter) = &self.filter else { + // If the heap doesn't have k elements yet, we can't create thresholds + let Some(max_row) = self.heap.max() else { return Ok(()); }; - let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? else { - return Ok(()); + + let new_threshold_row = &max_row.row; + + // Extract scalar values BEFORE acquiring lock to reduce critical section + let thresholds = match self.heap.get_threshold_values(&self.expr)? { + Some(t) => t, + None => return Ok(()), }; + // Extract filter expression reference before entering critical section + let filter_expr = Arc::clone(&self.filter.expr); + + // Check if we need to update the threshold (lock-free read) + let current_threshold = self.filter.threshold_row.load(); + let needs_update = match current_threshold.as_ref() { + Some(current_row) => { + // new < current means new threshold is more selective + current_row.as_slice().cmp(new_threshold_row) == Ordering::Greater + } + None => true, // No current threshold, so we need to set one + }; + + // Only proceed if we need to update + if needs_update { + // Build the filter expression OUTSIDE any synchronization + let predicate = Self::build_filter_expression(&self.expr, thresholds)?; + let new_threshold_arc = Arc::new(new_threshold_row.to_vec()); + + // Atomically update the threshold using compare-and-swap + let old_threshold = self.filter.threshold_row.compare_and_swap( + ¤t_threshold, + Some(Arc::clone(&new_threshold_arc)), + ); + + // Only update filter if we successfully updated the threshold + // (or if there was no previous threshold and we're the first) + let should_update_filter = + match (old_threshold.as_ref(), current_threshold.as_ref()) { + // We successfully swapped + (Some(old), Some(expected)) if Arc::ptr_eq(old, expected) => true, + // We were the first to set it + (None, None) => true, + // Another thread updated before us, check if our threshold is still better + (Some(actual_old), _) => { + actual_old.as_slice().cmp(new_threshold_row) == Ordering::Greater Review Comment: I think you resolved this in your commits, just confirming -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org