kosiew commented on code in PR #15301:
URL: https://github.com/apache/datafusion/pull/15301#discussion_r2006732123


##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -163,26 +187,32 @@ impl TopK {
         // TODO make this algorithmically better?:
         // Idea: filter out rows >= self.heap.max() early (before passing to 
`RowConverter`)
         //       this avoids some work and also might be better vectorizable.
-        let mut batch_entry = self.heap.register_batch(batch);
+        let mut heap = self.heap.try_write().map_err(|_| {
+            DataFusionError::Internal(
+                "Failed to acquire write lock on TopK heap".to_string(),

Review Comment:
   The use of try_write() here and 
   let mut heap = self.heap.write().map_err later at line 235 immediately 
return an error if the lock cannot be acquired. This means that if another 
thread holds the lock—even for a brief moment—the current thread will error out 
and convert the failure into an internal error. In the blocking version using 
write(), if the lock acquisition fails due to a poisoned lock, it’s also 
immediately converted into an internal error.
   
   Converting lock acquisition failures into an internal error is a 
straightforward approach. It flags an unexpected situation but doesn’t provide 
details on the nature of the contention or any attempt to recover from 
transient lock conflicts.
   
   
   Should we consider retry for transient contention or add logging for 
diagnostic information?



##########
datafusion/datasource/src/file.rs:
##########
@@ -93,4 +93,15 @@ pub trait FileSource: Send + Sync {
         }
         Ok(None)
     }
+
+    fn supports_dynamic_filter_pushdown(&self) -> bool {
+        false
+    }
+
+    fn push_down_dynamic_filter(
+        &self,
+        _dynamic_filter: Arc<dyn DynamicFilterSource>,
+    ) -> datafusion_common::Result<Option<Arc<dyn FileSource>>> {
+        Ok(None)
+    }

Review Comment:
   This PR adds support for dynamic filter pushdown in multiple modules (e.g., 
in FileSource, DataSource, ProjectionExec, RepartitionExec, FilterExec, and 
SortExec).
   
   Common helper functions or traits could reduce code duplication.
   For example, a shared trait for dynamic filter pushdown behavior might 
centralize the logic and reduce maintenance overhead.



##########
datafusion/datasource-parquet/src/mod.rs:
##########
@@ -541,11 +541,13 @@ impl ExecutionPlan for ParquetExec {
 fn should_enable_page_index(
     enable_page_index: bool,
     page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>,
+    has_dynamic_filters: bool,
 ) -> bool {
     enable_page_index
-        && page_pruning_predicate.is_some()
-        && page_pruning_predicate
-            .as_ref()
-            .map(|p| p.filter_number() > 0)
-            .unwrap_or(false)
+        && (page_pruning_predicate.is_some()
+            && page_pruning_predicate
+                .as_ref()
+                .map(|p| p.filter_number() > 0)
+                .unwrap_or(false))
+        || has_dynamic_filters
 }

Review Comment:
   A comment would be helpful to explain why the presence of dynamic filters 
should trigger page index enablement.



##########
datafusion/common/src/config.rs:
##########
@@ -590,6 +590,9 @@ config_namespace! {
         /// during aggregations, if possible
         pub enable_topk_aggregation: bool, default = true
 
+        /// When set to true attempts to push down dynamic filters from TopK 
operations into file scans
+        pub enable_dynamic_filter_pushdown: bool, default = true

Review Comment:
   Comments would be useful here for future users maintainer to know:
   - Under which query scenarios dynamic filter pushdown is expected to be most 
beneficial.
   - How it interacts with static predicates and other optimizations (e.g., 
page pruning and bloom filters).
   
   Providing usage examples and performance caveats could help future users and 
maintainers.



##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -1655,4 +1656,46 @@ mod tests {
         assert_eq!(calls.len(), 2);
         assert_eq!(calls, vec![Some(123), Some(456)]);
     }
+
+    #[tokio::test]
+    async fn test_topk_predicate_pushdown() {
+        let ctx = SessionContext::new();
+        let opt = ListingOptions::new(Arc::new(ParquetFormat::default()))
+            // We need to force 1 partition because TopK predicate pushdown 
happens on a per-partition basis
+            // If we had 1 file per partition (as an example) no pushdown 
would happen
+            .with_target_partitions(1);
+
+        let tmp_dir = TempDir::new().unwrap();
+        let path = tmp_dir.path().to_str().unwrap().to_string();
+        // The point here is that we write many, many files.
+        // So when we scan after we processed the first one we should be able 
to skip the rest
+        // because of the TopK predicate pushdown.
+        for file in 0..100 {
+            let name = format!("test{:02}.parquet", file);
+            write_file(&format!("{path}/{name}"));
+        }
+        ctx.register_listing_table("base_table", path, opt, None, None)
+            .await
+            .unwrap();
+
+        let query = "select name from base_table order by id desc limit 3";
+
+        let batches = ctx.sql(query).await.unwrap().collect().await.unwrap();
+        #[rustfmt::skip]
+        let expected = [
+            "+--------+",
+            "| name   |",
+            "+--------+",
+            "| test02 |",
+            "| test02 |",
+            "| test02 |",
+            "+--------+",
+        ];
+        assert_batches_eq!(expected, &batches);
+
+        let sql = format!("explain analyze {query}");
+        let batches = ctx.sql(&sql).await.unwrap().collect().await.unwrap();
+        let explain_plan = format!("{}", 
pretty_format_batches(&batches).unwrap());
+        assert_contains!(explain_plan, "row_groups_pruned_statistics=96");

Review Comment:
   Should we also add tests for:
   - Negative scenarios where dynamic filter pushdown should not be applied.
   - Edge cases such as empty datasets or cases with only dynamic filters 
without a static predicate.
   - Verification that the combined predicate (static AND dynamic) behaves as 
expected in different configurations.



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -102,25 +108,52 @@ impl FileOpener for ParquetOpener {
 
         let batch_size = self.batch_size;
 
+        let dynamic_filters = self
+            .dynamic_filters
+            .iter()
+            .map(|f| f.current_filters())
+            .collect::<Result<Vec<_>>>()?
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        // Collect dynamic_filters into a single predicate by reducing with AND
+        let dynamic_predicate = dynamic_filters.into_iter().reduce(|a, b| {
+            Arc::new(BinaryExpr::new(a, datafusion_expr::Operator::And, b))

Review Comment:
   The dynamic filters are reduced using the AND operator when combined with 
the static predicate. It would help to document the reasoning behind this 
choice and any assumptions about how these predicates interact.
   
   eg Using the AND operator to combine the dynamic filters with the static 
predicate means that a row (or a row group) must satisfy both conditions before 
it's read from disk.
   
   The approach assumes that the static predicate and dynamic filters are 
independent and complementary. In other words, the dynamic filters are not 
meant to replace or override the original predicate; they refine the set of 
rows even further. If they were combined using OR, you might end up with more 
rows than necessary, which would negate the benefits of dynamic filtering.
   
   Since the dynamic filters are calculated at runtime, they might sometimes be 
conservative estimates. By combining them with AND, the system errs on the side 
of safety—only excluding data when it’s reasonably certain that the rows won’t 
match the overall query conditions.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to