adriangb commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2083516211


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -570,6 +680,47 @@ impl TopKHeap {
             + self.store.size()
             + self.owned_bytes
     }
+
+    fn get_threshold_values(
+        &self,
+        sort_exprs: &[PhysicalSortExpr],
+    ) -> Result<Option<Vec<ScalarValue>>> {
+        // If the heap doesn't have k elements yet, we can't create thresholds
+        let max_row = match self.max() {
+            Some(row) => row,
+            None => return Ok(None),
+        };
+
+        // Get the batch that contains the max row
+        let batch_entry = match self.store.get(max_row.batch_id) {
+            Some(entry) => entry,
+            None => return internal_err!("Invalid batch ID in TopKRow"),
+        };
+
+        // Extract threshold values for each sort expression
+        let mut scalar_values = Vec::with_capacity(sort_exprs.len());
+        for sort_expr in sort_exprs {

Review Comment:
   Maybe? We should measure overhead / execution time for a large case first. 
If it’s under a couple hundred ms it’s probably not worth the overhead. If we 
do parallelize I assume we can just pipe the asyncness down to here and use 
Tokio tasks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to