alamb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2294236933


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -319,19 +343,89 @@ impl TopK {
     /// (a > 2 OR (a = 2 AND b < 3))
     /// ```
     fn update_filter(&mut self) -> Result<()> {
-        let Some(filter) = &self.filter else {
+        // If the heap doesn't have k elements yet, we can't create thresholds
+        let Some(max_row) = self.heap.max() else {
             return Ok(());
         };
-        let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? 
else {
+
+        let new_threshold_row = &max_row.row;
+
+        // Extract scalar values BEFORE acquiring lock to reduce critical 
section
+        let thresholds = match self.heap.get_threshold_values(&self.expr)? {

Review Comment:
   I think we can move this down after the check for update too:
   - https://github.com/pydantic/datafusion/pull/37



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -409,12 +409,58 @@ impl FileOpener for ParquetOpener {
                 .with_row_groups(row_group_indexes)
                 .build()?;
 
-            let adapted = stream
-                .map_err(|e| ArrowError::ExternalError(Box::new(e)))
-                .map(move |maybe_batch| {
-                    maybe_batch
-                        .and_then(|b| 
schema_mapping.map_batch(b).map_err(Into::into))
-                });
+            // Create a stateful stream that can check pruning after each batch
+            let adapted = {

Review Comment:
   I found this code somewhat 🤯  (and this function is already 100s of lines 
long) I spent some time refactoring it into its own stream for readability and 
I also understand it better now. I'll put up a follow on PR to extract this 
logic -- no need to do it in this one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to