alamb commented on code in PR #16433: URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286140587
########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -214,41 +239,39 @@ impl TopK { let mut selected_rows = None; - if let Some(filter) = self.filter.as_ref() { - // If a filter is provided, update it with the new rows - let filter = filter.current()?; - let filtered = filter.evaluate(&batch)?; - let num_rows = batch.num_rows(); - let array = filtered.into_array(num_rows)?; - let mut filter = array.as_boolean().clone(); - let true_count = filter.true_count(); - if true_count == 0 { - // nothing to filter, so no need to update - return Ok(()); + // If a filter is provided, update it with the new rows + let filter = self.filter.expr.current()?; + let filtered = filter.evaluate(&batch)?; Review Comment: A minor nit for a follow on would be to move this code into its own function like `TopK::update_dynamic_filter` or something to make it easier to understand ########## datafusion/physical-expr/src/expressions/dynamic_filters.rs: ########## @@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr { Self { children, remapped_children: None, // Initially no remapped children - inner: Arc::new(RwLock::new(Inner::new(inner))), + inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))), Review Comment: I made a PR to show what this looks like without arc_swap - https://github.com/apache/datafusion/pull/17245 ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -121,13 +122,37 @@ pub struct TopK { /// Common sort prefix between the input and the sort expressions to allow early exit optimization common_sort_prefix: Arc<[PhysicalSortExpr]>, /// Filter matching the state of the `TopK` heap used for dynamic filter pushdown - filter: Option<Arc<DynamicFilterPhysicalExpr>>, + filter: TopKDynamicFilters, /// If true, indicates that all rows of subsequent batches are guaranteed /// to be greater (by byte order, after row conversion) than the top K, /// which means the top K won't change and the computation can be finished early. pub(crate) finished: bool, } +#[derive(Debug, Clone)] +pub struct TopKDynamicFilters { Review Comment: does it need to be pub? ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -319,19 +342,84 @@ impl TopK { /// (a > 2 OR (a = 2 AND b < 3)) /// ``` fn update_filter(&mut self) -> Result<()> { - let Some(filter) = &self.filter else { + // If the heap doesn't have k elements yet, we can't create thresholds + let Some(max_row) = self.heap.max() else { return Ok(()); }; - let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? else { - return Ok(()); + + let new_threshold_row = &max_row.row; + + // Extract scalar values BEFORE acquiring lock to reduce critical section + let thresholds = match self.heap.get_threshold_values(&self.expr)? { + Some(t) => t, + None => return Ok(()), }; + // Extract filter expression reference before entering critical section + let filter_expr = Arc::clone(&self.filter.expr); + + // Check if we need to update the threshold (lock-free read) + let current_threshold = self.filter.threshold_row.load(); + let needs_update = match current_threshold.as_ref() { + Some(current_row) => { + // new < current means new threshold is more selective + current_row.as_slice().cmp(new_threshold_row) == Ordering::Greater + } + None => true, // No current threshold, so we need to set one + }; + + // Only proceed if we need to update + if needs_update { + // Build the filter expression OUTSIDE any synchronization + let predicate = Self::build_filter_expression(&self.expr, thresholds)?; + let new_threshold_arc = Arc::new(new_threshold_row.to_vec()); + + // Atomically update the threshold using compare-and-swap + let old_threshold = self.filter.threshold_row.compare_and_swap( + ¤t_threshold, + Some(Arc::clone(&new_threshold_arc)), + ); + + // Only update filter if we successfully updated the threshold + // (or if there was no previous threshold and we're the first) + let should_update_filter = + match (old_threshold.as_ref(), current_threshold.as_ref()) { + // We successfully swapped + (Some(old), Some(expected)) if Arc::ptr_eq(old, expected) => true, + // We were the first to set it + (None, None) => true, + // Another thread updated before us, check if our threshold is still better + (Some(actual_old), _) => { + actual_old.as_slice().cmp(new_threshold_row) == Ordering::Greater Review Comment: I think there is a race condition here if one thread decides to update threshold_row, but another thread has subsequently updated it I think as written the first thread will still update the filter but will not also re-update threshold_row 🤔 to the new value ########## datafusion/physical-expr/src/expressions/dynamic_filters.rs: ########## @@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr { Self { children, remapped_children: None, // Initially no remapped children - inner: Arc::new(RwLock::new(Inner::new(inner))), + inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))), Review Comment: > That said it's a well maintained dep from a reputable maintainer, has a lot of usage and has no dependencies of its own. https://github.com/vorner seems ok, but also the only maintainer which worries me. it seems to have plenty of dependencies https://crates.io/crates/arc-swap/1.7.1/dependencies And its last release was over a year ago: https://crates.io/crates/arc-swap/1.7.1 which makes me somewhat worried for its maintainability ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -121,13 +122,37 @@ pub struct TopK { /// Common sort prefix between the input and the sort expressions to allow early exit optimization common_sort_prefix: Arc<[PhysicalSortExpr]>, /// Filter matching the state of the `TopK` heap used for dynamic filter pushdown - filter: Option<Arc<DynamicFilterPhysicalExpr>>, + filter: TopKDynamicFilters, /// If true, indicates that all rows of subsequent batches are guaranteed /// to be greater (by byte order, after row conversion) than the top K, /// which means the top K won't change and the computation can be finished early. pub(crate) finished: bool, } +#[derive(Debug, Clone)] +pub struct TopKDynamicFilters { + /// The current *global* threshold for the dynamic filter. + /// This is shared across all partitions and is updated by any of them. + /// Stored as row bytes for efficient comparison. + threshold_row: Arc<ArcSwapOption<Vec<u8>>>, + /// The expression used to evaluate the dynamic filter + expr: Arc<DynamicFilterPhysicalExpr>, Review Comment: Since the expr should only be updated when the threshold_row is updated, I personally suggest controlling them with the same mutex / arcswap thing so like ```rust pub struct TopK { ... /// locking is done for both fields at once filter: ArcSwap<TopKDynamicFilters>, ... } struct TopKDynamicFilters { threshold_row: Option<Vec<u8>>, /// The expression used to evaluate the dynamic filter expr: Arc<DynamicFilterPhysicalExpr>, } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org