adriangb commented on code in PR #15769: URL: https://github.com/apache/datafusion/pull/15769#discussion_r2051611694
########## datafusion/core/src/datasource/physical_plan/parquet.rs: ########## @@ -148,7 +149,7 @@ mod tests { let mut source = ParquetSource::default(); if let Some(predicate) = predicate { - source = source.with_predicate(Arc::clone(&file_schema), predicate); + source = source.with_predicate(predicate); Review Comment: This seemed like an easy win since I was able to just change this so that the schema is _always_ passed in by the `FileSourceConfigBuilder` instead of only when `with_predicate` is called. This was necessary becasue `with_predicate` is no longer called to attach a predicate, instaed it happens during an optimization pass so `ParquetSource` neesd to have it available at that point. I left `with_predicate` in there to avoid churn and in case there is a use case for attaching a predicate directly through the scan instad of a as a FilterExec that later gets pushed into the scan. ########## datafusion/core/src/datasource/listing/table.rs: ########## @@ -982,18 +980,6 @@ impl TableProvider for ListingTable { return Ok(TableProviderFilterPushDown::Exact); } - // if we can't push it down completely with only the filename-based/path-based - // column names, then we should check if we can do parquet predicate pushdown - let supports_pushdown = self.options.format.supports_filters_pushdown( - &self.file_schema, - &self.table_schema, - &[filter], - )?; - - if supports_pushdown == FilePushdownSupport::Supported { - return Ok(TableProviderFilterPushDown::Exact); - } Review Comment: The point of this PR is that this moves from being something specialized that `ListingTable` does to anything that works for any TableProvider / they don't need to do anything special! The checks for compatibility also happen all within the parquet data source machinery, instead of leaking implementations via `supports_filters_pushdown`. ########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -649,73 +649,45 @@ mod test { #[test] fn nested_data_structures_prevent_pushdown() { - let table_schema = get_basic_table_schema(); - - let file_schema = Schema::new(vec![Field::new( - "list_col", - DataType::Struct(Fields::empty()), - true, - )]); Review Comment: This test was wrong! It wanted to test that `list_col` prevents pushdown because it's a nested type. Instead it was prevented because `list_col` is not in the table / schema! ########## datafusion/datasource-parquet/src/source.rs: ########## @@ -587,4 +560,49 @@ impl FileSource for ParquetSource { } } } + + fn try_pushdown_filters( Review Comment: cc @berkaysynnada for this implementation ########## datafusion/datasource-parquet/src/mod.rs: ########## @@ -244,7 +242,7 @@ impl ParquetExecBuilder { inner: DataSourceExec::new(Arc::new(base_config.clone())), base_config, predicate, - pruning_predicate: parquet.pruning_predicate, + pruning_predicate: None, // for backwards compat since `ParquetExec` is only for backwards compat anyway Review Comment: Open to other suggestions (i.e. removing it). I felt like this minimizes breakage for folks still using `ParquetExec`, who are likely the same folks that want to do the least amount of work possible to upgrade. ########## datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt: ########## @@ -81,11 +81,15 @@ EXPLAIN select a from t_pushdown where b > 2 ORDER BY a; ---- logical_plan 01)Sort: t_pushdown.a ASC NULLS LAST -02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b > Int32(2)] +02)--Projection: t_pushdown.a +03)----Filter: t_pushdown.b > Int32(2) +04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2)] Review Comment: This is because the pushdown no longer happens at the logical level - it happens at the physical level. This makes sense, in part because the checks for suitability of pushdown are better at the physical level (there may be reasons to reject a pushdown at the physical level that are not present at the logical level, e.g. partition columns or encodings). ########## datafusion/datasource-parquet/src/source.rs: ########## @@ -559,25 +549,8 @@ impl FileSource for ParquetSource { .predicate() .map(|p| format!(", predicate={p}")) .unwrap_or_default(); - let pruning_predicate_string = self - .pruning_predicate - .as_ref() - .map(|pre| { - let mut guarantees = pre - .literal_guarantees() - .iter() - .map(|item| format!("{}", item)) - .collect_vec(); - guarantees.sort(); - format!( - ", pruning_predicate={}, required_guarantees=[{}]", - pre.predicate_expr(), - guarantees.join(", ") - ) - }) - .unwrap_or_default(); Review Comment: In https://github.com/apache/datafusion/pull/15561#pullrequestreview-2741008179 Andrew asked me to keep this, but now since the schema isn't even being passed in to `with_predicate` it's going to be hard to keep these. I suggest we just accept that they won't be present in the physical plans. If that's not okay what I could do is generate them on the fly in `fmt_extra` or generate them if `with_predicate` is called with a schema or `with_schema` is called with a predicate. But I'd like to avoid that unless someone thinks is worth it or has another suggestion. ########## datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt: ########## @@ -81,11 +81,15 @@ EXPLAIN select a from t_pushdown where b > 2 ORDER BY a; ---- logical_plan 01)Sort: t_pushdown.a ASC NULLS LAST -02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b > Int32(2)] +02)--Projection: t_pushdown.a +03)----Filter: t_pushdown.b > Int32(2) +04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2)] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[] +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 +05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND b@1 > 2 Review Comment: @berkaysynnada any idea why we have extra `CoalesceBatchesExec` and `RepartitionExec` now? ########## datafusion/datasource/src/file_format.rs: ########## @@ -109,37 +108,10 @@ pub trait FileFormat: Send + Sync + fmt::Debug { not_impl_err!("Writer not implemented for this format") } - /// Check if the specified file format has support for pushing down the provided filters within - /// the given schemas. Added initially to support the Parquet file format's ability to do this. - fn supports_filters_pushdown( Review Comment: Binning specialized code that was also leaking parquet stuff through DataSource and into TableProvider 😄 ########## datafusion/datasource/src/file_format.rs: ########## @@ -109,37 +108,10 @@ pub trait FileFormat: Send + Sync + fmt::Debug { not_impl_err!("Writer not implemented for this format") } - /// Check if the specified file format has support for pushing down the provided filters within - /// the given schemas. Added initially to support the Parquet file format's ability to do this. - fn supports_filters_pushdown( - &self, - _file_schema: &Schema, - _table_schema: &Schema, - _filters: &[&Expr], - ) -> Result<FilePushdownSupport> { - Ok(FilePushdownSupport::NoSupport) - } - /// Return the related FileSource such as `CsvSource`, `JsonSource`, etc. fn file_source(&self) -> Arc<dyn FileSource>; } -/// An enum to distinguish between different states when determining if certain filters can be -/// pushed down to file scanning -#[derive(Debug, PartialEq)] -pub enum FilePushdownSupport { Review Comment: Another one of these enums! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org