adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286514826


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -121,13 +122,37 @@ pub struct TopK {
     /// Common sort prefix between the input and the sort expressions to allow 
early exit optimization
     common_sort_prefix: Arc<[PhysicalSortExpr]>,
     /// Filter matching the state of the `TopK` heap used for dynamic filter 
pushdown
-    filter: Option<Arc<DynamicFilterPhysicalExpr>>,
+    filter: TopKDynamicFilters,
     /// If true, indicates that all rows of subsequent batches are guaranteed
     /// to be greater (by byte order, after row conversion) than the top K,
     /// which means the top K won't change and the computation can be finished 
early.
     pub(crate) finished: bool,
 }
 
+#[derive(Debug, Clone)]
+pub struct TopKDynamicFilters {
+    /// The current *global* threshold for the dynamic filter.
+    /// This is shared across all partitions and is updated by any of them.
+    /// Stored as row bytes for efficient comparison.
+    threshold_row: Arc<ArcSwapOption<Vec<u8>>>,
+    /// The expression used to evaluate the dynamic filter
+    expr: Arc<DynamicFilterPhysicalExpr>,

Review Comment:
   Yep good call! I think otherwise we're bound to run into bugs / 
synchronization  issues. Did it in bcc0dcdc4.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to