berkaysynnada commented on code in PR #15769:
URL: https://github.com/apache/datafusion/pull/15769#discussion_r2052242110


##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -589,4 +559,49 @@ impl FileSource for ParquetSource {
             }
         }
     }
+
+    fn try_pushdown_filters(
+        &self,
+        fd: FilterDescription,
+        config: &datafusion_common::config::ConfigOptions,
+    ) -> datafusion_common::Result<FilterPushdownResult<Arc<dyn FileSource>>> {
+        let Some(file_schema) = self.file_schema.clone() else {
+            return Ok(filter_pushdown_not_supported(fd));
+        };
+        let config_pushdown_enabled = 
config.execution.parquet.pushdown_filters;
+        let table_pushdown_enabled = self.pushdown_filters();
+        if table_pushdown_enabled || config_pushdown_enabled {

Review Comment:
   OR'ing this is correct?



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -253,18 +251,18 @@ use object_store::ObjectStore;
 /// [`RecordBatch`]: arrow::record_batch::RecordBatch
 /// [`SchemaAdapter`]: datafusion_datasource::schema_adapter::SchemaAdapter
 /// [`ParquetMetadata`]: parquet::file::metadata::ParquetMetaData
-#[derive(Clone, Default, Debug)]
+#[derive(Clone, Debug, Default)]
 pub struct ParquetSource {
     /// Options for reading Parquet files
     pub(crate) table_parquet_options: TableParquetOptions,
     /// Optional metrics
     pub(crate) metrics: ExecutionPlanMetricsSet,
+    /// The schema of the file.
+    /// In particular, this is the schema of the table without partition 
columns,
+    /// *not* the physical schema of the file.
+    pub(crate) file_schema: Option<SchemaRef>,
     /// Optional predicate for row filtering during parquet scan
     pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
-    /// Optional predicate for pruning row groups (derived from `predicate`)

Review Comment:
   good to see these are unifying



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -253,18 +251,18 @@ use object_store::ObjectStore;
 /// [`RecordBatch`]: arrow::record_batch::RecordBatch
 /// [`SchemaAdapter`]: datafusion_datasource::schema_adapter::SchemaAdapter
 /// [`ParquetMetadata`]: parquet::file::metadata::ParquetMetaData
-#[derive(Clone, Default, Debug)]
+#[derive(Clone, Debug, Default)]
 pub struct ParquetSource {
     /// Options for reading Parquet files
     pub(crate) table_parquet_options: TableParquetOptions,
     /// Optional metrics
     pub(crate) metrics: ExecutionPlanMetricsSet,
+    /// The schema of the file.
+    /// In particular, this is the schema of the table without partition 
columns,
+    /// *not* the physical schema of the file.
+    pub(crate) file_schema: Option<SchemaRef>,

Review Comment:
   There is also another schema in FileScanConfig. Are they both reflects the 
file schema, not physical schema? and can we somehow unify them?



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -982,18 +980,6 @@ impl TableProvider for ListingTable {
                     return Ok(TableProviderFilterPushDown::Exact);
                 }
 
-                // if we can't push it down completely with only the 
filename-based/path-based
-                // column names, then we should check if we can do parquet 
predicate pushdown
-                let supports_pushdown = 
self.options.format.supports_filters_pushdown(
-                    &self.file_schema,
-                    &self.table_schema,
-                    &[filter],
-                )?;
-
-                if supports_pushdown == FilePushdownSupport::Supported {
-                    return Ok(TableProviderFilterPushDown::Exact);
-                }

Review Comment:
   If that's the case, why don't we fully remove `supports_filters_pushdown()` 
API at all



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -589,4 +559,49 @@ impl FileSource for ParquetSource {
             }
         }
     }
+
+    fn try_pushdown_filters(
+        &self,
+        fd: FilterDescription,
+        config: &datafusion_common::config::ConfigOptions,
+    ) -> datafusion_common::Result<FilterPushdownResult<Arc<dyn FileSource>>> {
+        let Some(file_schema) = self.file_schema.clone() else {

Review Comment:
   I'm asking to learn: in which cases ParquetSource doesn't have the schema?



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -589,4 +559,49 @@ impl FileSource for ParquetSource {
             }
         }
     }
+
+    fn try_pushdown_filters(
+        &self,
+        fd: FilterDescription,
+        config: &datafusion_common::config::ConfigOptions,
+    ) -> datafusion_common::Result<FilterPushdownResult<Arc<dyn FileSource>>> {
+        let Some(file_schema) = self.file_schema.clone() else {
+            return Ok(filter_pushdown_not_supported(fd));
+        };
+        let config_pushdown_enabled = 
config.execution.parquet.pushdown_filters;
+        let table_pushdown_enabled = self.pushdown_filters();
+        if table_pushdown_enabled || config_pushdown_enabled {
+            let mut conf = self.clone();

Review Comment:
   `conf` name is not very clear



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -982,18 +980,6 @@ impl TableProvider for ListingTable {
                     return Ok(TableProviderFilterPushDown::Exact);
                 }
 
-                // if we can't push it down completely with only the 
filename-based/path-based
-                // column names, then we should check if we can do parquet 
predicate pushdown
-                let supports_pushdown = 
self.options.format.supports_filters_pushdown(
-                    &self.file_schema,
-                    &self.table_schema,
-                    &[filter],
-                )?;
-
-                if supports_pushdown == FilePushdownSupport::Supported {
-                    return Ok(TableProviderFilterPushDown::Exact);
-                }

Review Comment:
   I have one question: aren't we expecting/preparing for, people to use 
ListingTable if they read Parquet files? Are we eventually planning to remove 
all format-specific handlings? Or this is a case only for filter pushdown?



##########
datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt:
##########
@@ -81,11 +81,15 @@ EXPLAIN select a from t_pushdown where b > 2 ORDER BY a;
 ----
 logical_plan
 01)Sort: t_pushdown.a ASC NULLS LAST
-02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b > 
Int32(2)]
+02)--Projection: t_pushdown.a
+03)----Filter: t_pushdown.b > Int32(2)
+04)------TableScan: t_pushdown projection=[a, b], 
partial_filters=[t_pushdown.b > Int32(2)]
 physical_plan
 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)----DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a], file_type=parquet, predicate=b@1 > 2, 
pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, 
required_guarantees=[]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
+05)--------DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a], file_type=parquet, predicate=b@1 > 2 AND b@1 > 2

Review Comment:
   I've a guess but not proved: CoalesceBatchesExec comes because of 
RepartitionExec, and RepartitionExec is inserted to satisfy partition count, 
which is 4. That's required by FilterExec now (which was pushed down at the 
logical level before), but that FilterExec is pushed down later after 
EnforceDistribution.
   
   So, this makes me think about the correct order of physical rules. 
PushdownFilter should probably work before distribution&order satisfiers.  But 
that could also bring some issues, I'm not sure.



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -589,4 +559,49 @@ impl FileSource for ParquetSource {
             }
         }
     }
+
+    fn try_pushdown_filters(
+        &self,
+        fd: FilterDescription,
+        config: &datafusion_common::config::ConfigOptions,
+    ) -> datafusion_common::Result<FilterPushdownResult<Arc<dyn FileSource>>> {
+        let Some(file_schema) = self.file_schema.clone() else {
+            return Ok(filter_pushdown_not_supported(fd));
+        };
+        let config_pushdown_enabled = 
config.execution.parquet.pushdown_filters;
+        let table_pushdown_enabled = self.pushdown_filters();
+        if table_pushdown_enabled || config_pushdown_enabled {
+            let mut conf = self.clone();
+            let mut allowed_filters = vec![];
+            let mut remaining_filters = vec![];
+            for filter in &fd.filters {

Review Comment:
   `fd.take_filters()` to avoid clone's below



##########
datafusion/sqllogictest/test_files/aggregate.slt:
##########
@@ -5006,7 +5006,7 @@ SELECT column5, avg(column1) FROM d GROUP BY column5;
 
 query I??
 SELECT column5, column1, avg(column1) OVER (PARTITION BY column5 ORDER BY 
column1 ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) as window_avg 
-FROM d WHERE column1 IS NOT NULL;
+FROM d WHERE column1 IS NOT NULL ORDER BY column5 DESC;

Review Comment:
   Plan doesn't change, but why results changed 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to