adriangb commented on code in PR #15769:
URL: https://github.com/apache/datafusion/pull/15769#discussion_r2051611694


##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -148,7 +149,7 @@ mod tests {
 
             let mut source = ParquetSource::default();
             if let Some(predicate) = predicate {
-                source = source.with_predicate(Arc::clone(&file_schema), 
predicate);
+                source = source.with_predicate(predicate);

Review Comment:
   This seemed like an easy win since I was able to just change this so that 
the schema is _always_ passed in by the `FileSourceConfigBuilder` instead of 
only when `with_predicate` is called.
   This was necessary becasue `with_predicate` is no longer called to attach a 
predicate, instaed it happens during an optimization pass so `ParquetSource` 
neesd to have it available at that point.
   I left `with_predicate` in there to avoid churn and in case there is a use 
case for attaching a predicate directly through the scan instad of a as a 
FilterExec that later gets pushed into the scan.



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -982,18 +980,6 @@ impl TableProvider for ListingTable {
                     return Ok(TableProviderFilterPushDown::Exact);
                 }
 
-                // if we can't push it down completely with only the 
filename-based/path-based
-                // column names, then we should check if we can do parquet 
predicate pushdown
-                let supports_pushdown = 
self.options.format.supports_filters_pushdown(
-                    &self.file_schema,
-                    &self.table_schema,
-                    &[filter],
-                )?;
-
-                if supports_pushdown == FilePushdownSupport::Supported {
-                    return Ok(TableProviderFilterPushDown::Exact);
-                }

Review Comment:
   The point of this PR is that this moves from being something specialized 
that `ListingTable` does to anything that works for any TableProvider / they 
don't need to do anything special! The checks for compatibility also happen all 
within the parquet data source machinery, instead of leaking implementations 
via `supports_filters_pushdown`.



##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -649,73 +649,45 @@ mod test {
 
     #[test]
     fn nested_data_structures_prevent_pushdown() {
-        let table_schema = get_basic_table_schema();
-
-        let file_schema = Schema::new(vec![Field::new(
-            "list_col",
-            DataType::Struct(Fields::empty()),
-            true,
-        )]);

Review Comment:
   This test was wrong! It wanted to test that `list_col` prevents pushdown 
because it's a nested type. Instead it was prevented because `list_col` is not 
in the table / schema!



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -587,4 +560,49 @@ impl FileSource for ParquetSource {
             }
         }
     }
+
+    fn try_pushdown_filters(

Review Comment:
   cc @berkaysynnada for this implementation



##########
datafusion/datasource-parquet/src/mod.rs:
##########
@@ -244,7 +242,7 @@ impl ParquetExecBuilder {
             inner: DataSourceExec::new(Arc::new(base_config.clone())),
             base_config,
             predicate,
-            pruning_predicate: parquet.pruning_predicate,
+            pruning_predicate: None, // for backwards compat since 
`ParquetExec` is only for backwards compat anyway

Review Comment:
   Open to other suggestions (i.e. removing it). I felt like this minimizes 
breakage for folks still using `ParquetExec`, who are likely the same folks 
that want to do the least amount of work possible to upgrade.



##########
datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt:
##########
@@ -81,11 +81,15 @@ EXPLAIN select a from t_pushdown where b > 2 ORDER BY a;
 ----
 logical_plan
 01)Sort: t_pushdown.a ASC NULLS LAST
-02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b > 
Int32(2)]
+02)--Projection: t_pushdown.a
+03)----Filter: t_pushdown.b > Int32(2)
+04)------TableScan: t_pushdown projection=[a, b], 
partial_filters=[t_pushdown.b > Int32(2)]

Review Comment:
   This is because the pushdown no longer happens at the logical level - it 
happens at the physical level. This makes sense, in part because the checks for 
suitability of pushdown are better at the physical level (there may be reasons 
to reject a pushdown at the physical level that are not present at the logical 
level, e.g. partition columns or encodings).



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -559,25 +549,8 @@ impl FileSource for ParquetSource {
                     .predicate()
                     .map(|p| format!(", predicate={p}"))
                     .unwrap_or_default();
-                let pruning_predicate_string = self
-                    .pruning_predicate
-                    .as_ref()
-                    .map(|pre| {
-                        let mut guarantees = pre
-                            .literal_guarantees()
-                            .iter()
-                            .map(|item| format!("{}", item))
-                            .collect_vec();
-                        guarantees.sort();
-                        format!(
-                            ", pruning_predicate={}, required_guarantees=[{}]",
-                            pre.predicate_expr(),
-                            guarantees.join(", ")
-                        )
-                    })
-                    .unwrap_or_default();

Review Comment:
   In 
https://github.com/apache/datafusion/pull/15561#pullrequestreview-2741008179 
Andrew asked me to keep this, but now since the schema isn't even being passed 
in to `with_predicate` it's going to be hard to keep these. I suggest we just 
accept that they won't be present in the physical plans. If that's not okay 
what I could do is generate them on the fly in `fmt_extra` or generate them if 
`with_predicate` is called with a schema or `with_schema` is called with a 
predicate. But I'd like to avoid that unless someone thinks is worth it or has 
another suggestion.



##########
datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt:
##########
@@ -81,11 +81,15 @@ EXPLAIN select a from t_pushdown where b > 2 ORDER BY a;
 ----
 logical_plan
 01)Sort: t_pushdown.a ASC NULLS LAST
-02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b > 
Int32(2)]
+02)--Projection: t_pushdown.a
+03)----Filter: t_pushdown.b > Int32(2)
+04)------TableScan: t_pushdown projection=[a, b], 
partial_filters=[t_pushdown.b > Int32(2)]
 physical_plan
 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)----DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a], file_type=parquet, predicate=b@1 > 2, 
pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, 
required_guarantees=[]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
+05)--------DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a], file_type=parquet, predicate=b@1 > 2 AND b@1 > 2

Review Comment:
   @berkaysynnada any idea why we have extra `CoalesceBatchesExec` and 
`RepartitionExec` now?



##########
datafusion/datasource/src/file_format.rs:
##########
@@ -109,37 +108,10 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
         not_impl_err!("Writer not implemented for this format")
     }
 
-    /// Check if the specified file format has support for pushing down the 
provided filters within
-    /// the given schemas. Added initially to support the Parquet file 
format's ability to do this.
-    fn supports_filters_pushdown(

Review Comment:
   Binning specialized code that was also leaking parquet stuff through 
DataSource and into TableProvider 😄 



##########
datafusion/datasource/src/file_format.rs:
##########
@@ -109,37 +108,10 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
         not_impl_err!("Writer not implemented for this format")
     }
 
-    /// Check if the specified file format has support for pushing down the 
provided filters within
-    /// the given schemas. Added initially to support the Parquet file 
format's ability to do this.
-    fn supports_filters_pushdown(
-        &self,
-        _file_schema: &Schema,
-        _table_schema: &Schema,
-        _filters: &[&Expr],
-    ) -> Result<FilePushdownSupport> {
-        Ok(FilePushdownSupport::NoSupport)
-    }
-
     /// Return the related FileSource such as `CsvSource`, `JsonSource`, etc.
     fn file_source(&self) -> Arc<dyn FileSource>;
 }
 
-/// An enum to distinguish between different states when determining if 
certain filters can be
-/// pushed down to file scanning
-#[derive(Debug, PartialEq)]
-pub enum FilePushdownSupport {

Review Comment:
   Another one of these enums!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to