adriangb commented on code in PR #15301: URL: https://github.com/apache/datafusion/pull/15301#discussion_r2017257698
########## datafusion/common/src/config.rs: ########## @@ -590,6 +590,9 @@ config_namespace! { /// during aggregations, if possible pub enable_topk_aggregation: bool, default = true + /// When set to true attempts to push down dynamic filters from TopK operations into file scans + pub enable_dynamic_filter_pushdown: bool, default = true Review Comment: Added a basic description here. ########## datafusion/datasource-parquet/src/mod.rs: ########## @@ -541,11 +541,13 @@ impl ExecutionPlan for ParquetExec { fn should_enable_page_index( enable_page_index: bool, page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>, + has_dynamic_filters: bool, ) -> bool { enable_page_index - && page_pruning_predicate.is_some() - && page_pruning_predicate - .as_ref() - .map(|p| p.filter_number() > 0) - .unwrap_or(false) + && (page_pruning_predicate.is_some() + && page_pruning_predicate + .as_ref() + .map(|p| p.filter_number() > 0) + .unwrap_or(false)) + || has_dynamic_filters } Review Comment: 7f0c894a5 ########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -102,25 +108,52 @@ impl FileOpener for ParquetOpener { let batch_size = self.batch_size; + let dynamic_filters = self + .dynamic_filters + .iter() + .map(|f| f.current_filters()) + .collect::<Result<Vec<_>>>()? + .into_iter() + .flatten() + .collect::<Vec<_>>(); + // Collect dynamic_filters into a single predicate by reducing with AND + let dynamic_predicate = dynamic_filters.into_iter().reduce(|a, b| { + Arc::new(BinaryExpr::new(a, datafusion_expr::Operator::And, b)) Review Comment: cce16e33a ########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -102,25 +110,52 @@ impl FileOpener for ParquetOpener { let batch_size = self.batch_size; + let dynamic_filters = self + .dynamic_filters + .iter() + .map(|f| f.current_filters()) + .collect::<Result<Vec<_>>>()? + .into_iter() + .flatten() + .collect::<Vec<_>>(); + // Collect dynamic_filters into a single predicate by reducing with AND + let dynamic_predicate = dynamic_filters.into_iter().reduce(|a, b| { + Arc::new(BinaryExpr::new(a, datafusion_expr::Operator::And, b)) + }); + let enable_page_index = should_enable_page_index( + self.enable_page_index, + &self.page_pruning_predicate, + dynamic_predicate.is_some(), + ); + let predicate = self.predicate.clone(); + let predicate = match (predicate, dynamic_predicate) { Review Comment: I ended up adding this. We can split this off later once I get to doing cleanup on this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org