adriangb commented on code in PR #15301:
URL: https://github.com/apache/datafusion/pull/15301#discussion_r2017257698


##########
datafusion/common/src/config.rs:
##########
@@ -590,6 +590,9 @@ config_namespace! {
         /// during aggregations, if possible
         pub enable_topk_aggregation: bool, default = true
 
+        /// When set to true attempts to push down dynamic filters from TopK 
operations into file scans
+        pub enable_dynamic_filter_pushdown: bool, default = true

Review Comment:
   Added a basic description here.



##########
datafusion/datasource-parquet/src/mod.rs:
##########
@@ -541,11 +541,13 @@ impl ExecutionPlan for ParquetExec {
 fn should_enable_page_index(
     enable_page_index: bool,
     page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>,
+    has_dynamic_filters: bool,
 ) -> bool {
     enable_page_index
-        && page_pruning_predicate.is_some()
-        && page_pruning_predicate
-            .as_ref()
-            .map(|p| p.filter_number() > 0)
-            .unwrap_or(false)
+        && (page_pruning_predicate.is_some()
+            && page_pruning_predicate
+                .as_ref()
+                .map(|p| p.filter_number() > 0)
+                .unwrap_or(false))
+        || has_dynamic_filters
 }

Review Comment:
   7f0c894a5



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -102,25 +108,52 @@ impl FileOpener for ParquetOpener {
 
         let batch_size = self.batch_size;
 
+        let dynamic_filters = self
+            .dynamic_filters
+            .iter()
+            .map(|f| f.current_filters())
+            .collect::<Result<Vec<_>>>()?
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        // Collect dynamic_filters into a single predicate by reducing with AND
+        let dynamic_predicate = dynamic_filters.into_iter().reduce(|a, b| {
+            Arc::new(BinaryExpr::new(a, datafusion_expr::Operator::And, b))

Review Comment:
   cce16e33a



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -102,25 +110,52 @@ impl FileOpener for ParquetOpener {
 
         let batch_size = self.batch_size;
 
+        let dynamic_filters = self
+            .dynamic_filters
+            .iter()
+            .map(|f| f.current_filters())
+            .collect::<Result<Vec<_>>>()?
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        // Collect dynamic_filters into a single predicate by reducing with AND
+        let dynamic_predicate = dynamic_filters.into_iter().reduce(|a, b| {
+            Arc::new(BinaryExpr::new(a, datafusion_expr::Operator::And, b))
+        });
+        let enable_page_index = should_enable_page_index(
+            self.enable_page_index,
+            &self.page_pruning_predicate,
+            dynamic_predicate.is_some(),
+        );
+        let predicate = self.predicate.clone();
+        let predicate = match (predicate, dynamic_predicate) {

Review Comment:
   I ended up adding this. We can split this off later once I get to doing 
cleanup on this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to