adriangb commented on code in PR #15769:
URL: https://github.com/apache/datafusion/pull/15769#discussion_r2051611694
##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -148,7 +149,7 @@ mod tests {
let mut source = ParquetSource::default();
if let Some(predicate) = predicate {
- source = source.with_predicate(Arc::clone(&file_schema),
predicate);
+ source = source.with_predicate(predicate);
Review Comment:
This seemed like an easy win since I was able to just change this so that
the schema is _always_ passed in by the `FileSourceConfigBuilder` instead of
only when `with_predicate` is called.
This was necessary becasue `with_predicate` is no longer called to attach a
predicate, instaed it happens during an optimization pass so `ParquetSource`
neesd to have it available at that point.
I left `with_predicate` in there to avoid churn and in case there is a use
case for attaching a predicate directly through the scan instad of a as a
FilterExec that later gets pushed into the scan.
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -982,18 +980,6 @@ impl TableProvider for ListingTable {
return Ok(TableProviderFilterPushDown::Exact);
}
- // if we can't push it down completely with only the
filename-based/path-based
- // column names, then we should check if we can do parquet
predicate pushdown
- let supports_pushdown =
self.options.format.supports_filters_pushdown(
- &self.file_schema,
- &self.table_schema,
- &[filter],
- )?;
-
- if supports_pushdown == FilePushdownSupport::Supported {
- return Ok(TableProviderFilterPushDown::Exact);
- }
Review Comment:
The point of this PR is that this moves from being something specialized
that `ListingTable` does to anything that works for any TableProvider / they
don't need to do anything special! The checks for compatibility also happen all
within the parquet data source machinery, instead of leaking implementations
via `supports_filters_pushdown`.
##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -649,73 +649,45 @@ mod test {
#[test]
fn nested_data_structures_prevent_pushdown() {
- let table_schema = get_basic_table_schema();
-
- let file_schema = Schema::new(vec![Field::new(
- "list_col",
- DataType::Struct(Fields::empty()),
- true,
- )]);
Review Comment:
This test was wrong! It wanted to test that `list_col` prevents pushdown
because it's a nested type. Instead it was prevented because `list_col` is not
in the table / schema!
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -587,4 +560,49 @@ impl FileSource for ParquetSource {
}
}
}
+
+ fn try_pushdown_filters(
Review Comment:
cc @berkaysynnada for this implementation
##########
datafusion/datasource-parquet/src/mod.rs:
##########
@@ -244,7 +242,7 @@ impl ParquetExecBuilder {
inner: DataSourceExec::new(Arc::new(base_config.clone())),
base_config,
predicate,
- pruning_predicate: parquet.pruning_predicate,
+ pruning_predicate: None, // for backwards compat since
`ParquetExec` is only for backwards compat anyway
Review Comment:
Open to other suggestions (i.e. removing it). I felt like this minimizes
breakage for folks still using `ParquetExec`, who are likely the same folks
that want to do the least amount of work possible to upgrade.
##########
datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt:
##########
@@ -81,11 +81,15 @@ EXPLAIN select a from t_pushdown where b > 2 ORDER BY a;
----
logical_plan
01)Sort: t_pushdown.a ASC NULLS LAST
-02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b >
Int32(2)]
+02)--Projection: t_pushdown.a
+03)----Filter: t_pushdown.b > Int32(2)
+04)------TableScan: t_pushdown projection=[a, b],
partial_filters=[t_pushdown.b > Int32(2)]
Review Comment:
This is because the pushdown no longer happens at the logical level - it
happens at the physical level. This makes sense, in part because the checks for
suitability of pushdown are better at the physical level (there may be reasons
to reject a pushdown at the physical level that are not present at the logical
level, e.g. partition columns or encodings).
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -559,25 +549,8 @@ impl FileSource for ParquetSource {
.predicate()
.map(|p| format!(", predicate={p}"))
.unwrap_or_default();
- let pruning_predicate_string = self
- .pruning_predicate
- .as_ref()
- .map(|pre| {
- let mut guarantees = pre
- .literal_guarantees()
- .iter()
- .map(|item| format!("{}", item))
- .collect_vec();
- guarantees.sort();
- format!(
- ", pruning_predicate={}, required_guarantees=[{}]",
- pre.predicate_expr(),
- guarantees.join(", ")
- )
- })
- .unwrap_or_default();
Review Comment:
In
https://github.com/apache/datafusion/pull/15561#pullrequestreview-2741008179
Andrew asked me to keep this, but now since the schema isn't even being passed
in to `with_predicate` it's going to be hard to keep these. I suggest we just
accept that they won't be present in the physical plans. If that's not okay
what I could do is generate them on the fly in `fmt_extra` or generate them if
`with_predicate` is called with a schema or `with_schema` is called with a
predicate. But I'd like to avoid that unless someone thinks is worth it or has
another suggestion.
##########
datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt:
##########
@@ -81,11 +81,15 @@ EXPLAIN select a from t_pushdown where b > 2 ORDER BY a;
----
logical_plan
01)Sort: t_pushdown.a ASC NULLS LAST
-02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b >
Int32(2)]
+02)--Projection: t_pushdown.a
+03)----Filter: t_pushdown.b > Int32(2)
+04)------TableScan: t_pushdown projection=[a, b],
partial_filters=[t_pushdown.b > Int32(2)]
physical_plan
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)----DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a], file_type=parquet, predicate=b@1 > 2,
pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2,
required_guarantees=[]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
+05)--------DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a], file_type=parquet, predicate=b@1 > 2 AND b@1 > 2
Review Comment:
@berkaysynnada any idea why we have extra `CoalesceBatchesExec` and
`RepartitionExec` now?
##########
datafusion/datasource/src/file_format.rs:
##########
@@ -109,37 +108,10 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
not_impl_err!("Writer not implemented for this format")
}
- /// Check if the specified file format has support for pushing down the
provided filters within
- /// the given schemas. Added initially to support the Parquet file
format's ability to do this.
- fn supports_filters_pushdown(
Review Comment:
Binning specialized code that was also leaking parquet stuff through
DataSource and into TableProvider 😄
##########
datafusion/datasource/src/file_format.rs:
##########
@@ -109,37 +108,10 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
not_impl_err!("Writer not implemented for this format")
}
- /// Check if the specified file format has support for pushing down the
provided filters within
- /// the given schemas. Added initially to support the Parquet file
format's ability to do this.
- fn supports_filters_pushdown(
- &self,
- _file_schema: &Schema,
- _table_schema: &Schema,
- _filters: &[&Expr],
- ) -> Result<FilePushdownSupport> {
- Ok(FilePushdownSupport::NoSupport)
- }
-
/// Return the related FileSource such as `CsvSource`, `JsonSource`, etc.
fn file_source(&self) -> Arc<dyn FileSource>;
}
-/// An enum to distinguish between different states when determining if
certain filters can be
-/// pushed down to file scanning
-#[derive(Debug, PartialEq)]
-pub enum FilePushdownSupport {
Review Comment:
Another one of these enums!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]