kosiew commented on code in PR #15301: URL: https://github.com/apache/datafusion/pull/15301#discussion_r2006732123
########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -163,26 +187,32 @@ impl TopK { // TODO make this algorithmically better?: // Idea: filter out rows >= self.heap.max() early (before passing to `RowConverter`) // this avoids some work and also might be better vectorizable. - let mut batch_entry = self.heap.register_batch(batch); + let mut heap = self.heap.try_write().map_err(|_| { + DataFusionError::Internal( + "Failed to acquire write lock on TopK heap".to_string(), Review Comment: The use of try_write() here and let mut heap = self.heap.write().map_err later at line 235 immediately return an error if the lock cannot be acquired. This means that if another thread holds the lock—even for a brief moment—the current thread will error out and convert the failure into an internal error. In the blocking version using write(), if the lock acquisition fails due to a poisoned lock, it’s also immediately converted into an internal error. Converting lock acquisition failures into an internal error is a straightforward approach. It flags an unexpected situation but doesn’t provide details on the nature of the contention or any attempt to recover from transient lock conflicts. Should we consider retry for transient contention or add logging for diagnostic information? ########## datafusion/datasource/src/file.rs: ########## @@ -93,4 +93,15 @@ pub trait FileSource: Send + Sync { } Ok(None) } + + fn supports_dynamic_filter_pushdown(&self) -> bool { + false + } + + fn push_down_dynamic_filter( + &self, + _dynamic_filter: Arc<dyn DynamicFilterSource>, + ) -> datafusion_common::Result<Option<Arc<dyn FileSource>>> { + Ok(None) + } Review Comment: This PR adds support for dynamic filter pushdown in multiple modules (e.g., in FileSource, DataSource, ProjectionExec, RepartitionExec, FilterExec, and SortExec). Common helper functions or traits could reduce code duplication. For example, a shared trait for dynamic filter pushdown behavior might centralize the logic and reduce maintenance overhead. ########## datafusion/datasource-parquet/src/mod.rs: ########## @@ -541,11 +541,13 @@ impl ExecutionPlan for ParquetExec { fn should_enable_page_index( enable_page_index: bool, page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>, + has_dynamic_filters: bool, ) -> bool { enable_page_index - && page_pruning_predicate.is_some() - && page_pruning_predicate - .as_ref() - .map(|p| p.filter_number() > 0) - .unwrap_or(false) + && (page_pruning_predicate.is_some() + && page_pruning_predicate + .as_ref() + .map(|p| p.filter_number() > 0) + .unwrap_or(false)) + || has_dynamic_filters } Review Comment: A comment would be helpful to explain why the presence of dynamic filters should trigger page index enablement. ########## datafusion/common/src/config.rs: ########## @@ -590,6 +590,9 @@ config_namespace! { /// during aggregations, if possible pub enable_topk_aggregation: bool, default = true + /// When set to true attempts to push down dynamic filters from TopK operations into file scans + pub enable_dynamic_filter_pushdown: bool, default = true Review Comment: Comments would be useful here for future users maintainer to know: - Under which query scenarios dynamic filter pushdown is expected to be most beneficial. - How it interacts with static predicates and other optimizations (e.g., page pruning and bloom filters). Providing usage examples and performance caveats could help future users and maintainers. ########## datafusion/core/src/datasource/physical_plan/parquet.rs: ########## @@ -1655,4 +1656,46 @@ mod tests { assert_eq!(calls.len(), 2); assert_eq!(calls, vec![Some(123), Some(456)]); } + + #[tokio::test] + async fn test_topk_predicate_pushdown() { + let ctx = SessionContext::new(); + let opt = ListingOptions::new(Arc::new(ParquetFormat::default())) + // We need to force 1 partition because TopK predicate pushdown happens on a per-partition basis + // If we had 1 file per partition (as an example) no pushdown would happen + .with_target_partitions(1); + + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().to_str().unwrap().to_string(); + // The point here is that we write many, many files. + // So when we scan after we processed the first one we should be able to skip the rest + // because of the TopK predicate pushdown. + for file in 0..100 { + let name = format!("test{:02}.parquet", file); + write_file(&format!("{path}/{name}")); + } + ctx.register_listing_table("base_table", path, opt, None, None) + .await + .unwrap(); + + let query = "select name from base_table order by id desc limit 3"; + + let batches = ctx.sql(query).await.unwrap().collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+--------+", + "| name |", + "+--------+", + "| test02 |", + "| test02 |", + "| test02 |", + "+--------+", + ]; + assert_batches_eq!(expected, &batches); + + let sql = format!("explain analyze {query}"); + let batches = ctx.sql(&sql).await.unwrap().collect().await.unwrap(); + let explain_plan = format!("{}", pretty_format_batches(&batches).unwrap()); + assert_contains!(explain_plan, "row_groups_pruned_statistics=96"); Review Comment: Should we also add tests for: - Negative scenarios where dynamic filter pushdown should not be applied. - Edge cases such as empty datasets or cases with only dynamic filters without a static predicate. - Verification that the combined predicate (static AND dynamic) behaves as expected in different configurations. ########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -102,25 +108,52 @@ impl FileOpener for ParquetOpener { let batch_size = self.batch_size; + let dynamic_filters = self + .dynamic_filters + .iter() + .map(|f| f.current_filters()) + .collect::<Result<Vec<_>>>()? + .into_iter() + .flatten() + .collect::<Vec<_>>(); + // Collect dynamic_filters into a single predicate by reducing with AND + let dynamic_predicate = dynamic_filters.into_iter().reduce(|a, b| { + Arc::new(BinaryExpr::new(a, datafusion_expr::Operator::And, b)) Review Comment: The dynamic filters are reduced using the AND operator when combined with the static predicate. It would help to document the reasoning behind this choice and any assumptions about how these predicates interact. eg Using the AND operator to combine the dynamic filters with the static predicate means that a row (or a row group) must satisfy both conditions before it's read from disk. The approach assumes that the static predicate and dynamic filters are independent and complementary. In other words, the dynamic filters are not meant to replace or override the original predicate; they refine the set of rows even further. If they were combined using OR, you might end up with more rows than necessary, which would negate the benefits of dynamic filtering. Since the dynamic filters are calculated at runtime, they might sometimes be conservative estimates. By combining them with AND, the system errs on the side of safety—only excluding data when it’s reasonably certain that the rows won’t match the overall query conditions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org