alamb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286140587


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -214,41 +239,39 @@ impl TopK {
 
         let mut selected_rows = None;
 
-        if let Some(filter) = self.filter.as_ref() {
-            // If a filter is provided, update it with the new rows
-            let filter = filter.current()?;
-            let filtered = filter.evaluate(&batch)?;
-            let num_rows = batch.num_rows();
-            let array = filtered.into_array(num_rows)?;
-            let mut filter = array.as_boolean().clone();
-            let true_count = filter.true_count();
-            if true_count == 0 {
-                // nothing to filter, so no need to update
-                return Ok(());
+        // If a filter is provided, update it with the new rows
+        let filter = self.filter.expr.current()?;
+        let filtered = filter.evaluate(&batch)?;

Review Comment:
   A minor nit for a follow on would be to move this code into its own function 
like `TopK::update_dynamic_filter` or something to make it easier to understand



##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr {
         Self {
             children,
             remapped_children: None, // Initially no remapped children
-            inner: Arc::new(RwLock::new(Inner::new(inner))),
+            inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))),

Review Comment:
   I made a PR to show what this looks like without arc_swap
   - https://github.com/apache/datafusion/pull/17245



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -121,13 +122,37 @@ pub struct TopK {
     /// Common sort prefix between the input and the sort expressions to allow 
early exit optimization
     common_sort_prefix: Arc<[PhysicalSortExpr]>,
     /// Filter matching the state of the `TopK` heap used for dynamic filter 
pushdown
-    filter: Option<Arc<DynamicFilterPhysicalExpr>>,
+    filter: TopKDynamicFilters,
     /// If true, indicates that all rows of subsequent batches are guaranteed
     /// to be greater (by byte order, after row conversion) than the top K,
     /// which means the top K won't change and the computation can be finished 
early.
     pub(crate) finished: bool,
 }
 
+#[derive(Debug, Clone)]
+pub struct TopKDynamicFilters {

Review Comment:
   does it need to be pub?



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -319,19 +342,84 @@ impl TopK {
     /// (a > 2 OR (a = 2 AND b < 3))
     /// ```
     fn update_filter(&mut self) -> Result<()> {
-        let Some(filter) = &self.filter else {
+        // If the heap doesn't have k elements yet, we can't create thresholds
+        let Some(max_row) = self.heap.max() else {
             return Ok(());
         };
-        let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? 
else {
-            return Ok(());
+
+        let new_threshold_row = &max_row.row;
+
+        // Extract scalar values BEFORE acquiring lock to reduce critical 
section
+        let thresholds = match self.heap.get_threshold_values(&self.expr)? {
+            Some(t) => t,
+            None => return Ok(()),
         };
 
+        // Extract filter expression reference before entering critical section
+        let filter_expr = Arc::clone(&self.filter.expr);
+
+        // Check if we need to update the threshold (lock-free read)
+        let current_threshold = self.filter.threshold_row.load();
+        let needs_update = match current_threshold.as_ref() {
+            Some(current_row) => {
+                // new < current means new threshold is more selective
+                current_row.as_slice().cmp(new_threshold_row) == 
Ordering::Greater
+            }
+            None => true, // No current threshold, so we need to set one
+        };
+
+        // Only proceed if we need to update
+        if needs_update {
+            // Build the filter expression OUTSIDE any synchronization
+            let predicate = Self::build_filter_expression(&self.expr, 
thresholds)?;
+            let new_threshold_arc = Arc::new(new_threshold_row.to_vec());
+
+            // Atomically update the threshold using compare-and-swap
+            let old_threshold = self.filter.threshold_row.compare_and_swap(
+                &current_threshold,
+                Some(Arc::clone(&new_threshold_arc)),
+            );
+
+            // Only update filter if we successfully updated the threshold
+            // (or if there was no previous threshold and we're the first)
+            let should_update_filter =
+                match (old_threshold.as_ref(), current_threshold.as_ref()) {
+                    // We successfully swapped
+                    (Some(old), Some(expected)) if Arc::ptr_eq(old, expected) 
=> true,
+                    // We were the first to set it
+                    (None, None) => true,
+                    // Another thread updated before us, check if our 
threshold is still better
+                    (Some(actual_old), _) => {
+                        actual_old.as_slice().cmp(new_threshold_row) == 
Ordering::Greater

Review Comment:
   I think there is a race condition here if one thread decides to update 
threshold_row, but another thread has subsequently updated it
   
   I think as written the first thread will still update the filter  but will 
not also re-update threshold_row 🤔 to the new value



##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr {
         Self {
             children,
             remapped_children: None, // Initially no remapped children
-            inner: Arc::new(RwLock::new(Inner::new(inner))),
+            inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))),

Review Comment:
   > That said it's a well maintained dep from a reputable maintainer, has a 
lot of usage and has no dependencies of its own.
   
   https://github.com/vorner seems ok, but also the only maintainer which 
worries me. 
   
   it seems to have plenty of dependencies 
   https://crates.io/crates/arc-swap/1.7.1/dependencies
   
   And its last release was over a year ago: 
https://crates.io/crates/arc-swap/1.7.1 which makes me somewhat worried for its 
maintainability



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -121,13 +122,37 @@ pub struct TopK {
     /// Common sort prefix between the input and the sort expressions to allow 
early exit optimization
     common_sort_prefix: Arc<[PhysicalSortExpr]>,
     /// Filter matching the state of the `TopK` heap used for dynamic filter 
pushdown
-    filter: Option<Arc<DynamicFilterPhysicalExpr>>,
+    filter: TopKDynamicFilters,
     /// If true, indicates that all rows of subsequent batches are guaranteed
     /// to be greater (by byte order, after row conversion) than the top K,
     /// which means the top K won't change and the computation can be finished 
early.
     pub(crate) finished: bool,
 }
 
+#[derive(Debug, Clone)]
+pub struct TopKDynamicFilters {
+    /// The current *global* threshold for the dynamic filter.
+    /// This is shared across all partitions and is updated by any of them.
+    /// Stored as row bytes for efficient comparison.
+    threshold_row: Arc<ArcSwapOption<Vec<u8>>>,
+    /// The expression used to evaluate the dynamic filter
+    expr: Arc<DynamicFilterPhysicalExpr>,

Review Comment:
   Since the expr should only be updated when the threshold_row is updated, I 
personally suggest controlling them with the same mutex / arcswap thing
   
   so like
   
   ```rust
   pub struct TopK {
     ...
       /// locking is done for both fields at once
       filter: ArcSwap<TopKDynamicFilters>,
   ... 
   }
   
   struct TopKDynamicFilters {
       threshold_row: Option<Vec<u8>>,
       /// The expression used to evaluate the dynamic filter
       expr: Arc<DynamicFilterPhysicalExpr>,
   }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to