geoffreyclaude commented on code in PR #22991:
URL: https://github.com/apache/datafusion/pull/22991#discussion_r3437036972


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -904,19 +904,51 @@ impl SortExec {
         self.preserve_partitioning = preserve_partitioning;
         Arc::make_mut(&mut self.cache).partitioning =
             Self::output_partitioning_helper(&self.input, 
self.preserve_partitioning);
+        if self.fetch.is_some() {
+            self.rebuild_filter_for_current_partitioning();
+        }
         self
     }
 
-    /// Add or reset `self.filter` to a new `TopKDynamicFilters`.
+    fn topk_emitter_count(&self) -> usize {
+        self.cache.output_partitioning().partition_count()
+    }
+
+    /// Build a new shared TopK dynamic filter wrapper for this `SortExec`.
     fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
         let children = self
             .expr
             .iter()
             .map(|sort_expr| Arc::clone(&sort_expr.expr))
             .collect::<Vec<_>>();
-        Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
-            DynamicFilterPhysicalExpr::new(children, lit(true)),
-        ))))
+        self.create_filter_with_expr(Arc::new(DynamicFilterPhysicalExpr::new(
+            children,
+            lit(true),
+        )))
+    }
+
+    fn create_filter_with_expr(
+        &self,
+        expr: Arc<DynamicFilterPhysicalExpr>,
+    ) -> Arc<RwLock<TopKDynamicFilters>> {
+        Arc::new(RwLock::new(
+            TopKDynamicFilters::new_with_topk_emitter_count(
+                expr,
+                self.topk_emitter_count(),
+            ),
+        ))
+    }
+
+    /// Rebuild the shared TopK filter wrapper after output partitioning 
changes.
+    ///
+    /// The dynamic filter expression is preserved, but wrapper state such as 
the
+    /// shared threshold and remaining emitter count is reset for the new
+    /// partitioning.
+    fn rebuild_filter_for_current_partitioning(&mut self) {

Review Comment:
   Yes, exactly: this is plan-construction / plan-rewrite state, not runtime 
execution state. Rebuilding the wrapper is intentional when the SortExec 
properties change, because the wrapper owns plan-derived state such as the 
shared threshold and expected emitter count. We should not carry an active 
runtime threshold across that kind of rebuild.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to