adriangb commented on code in PR #15263:
URL: https://github.com/apache/datafusion/pull/15263#discussion_r1997813569


##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -224,6 +224,64 @@ mod tests {
         )
     }
 
+    #[tokio::test]
+    async fn test_pushdown_with_missing_column_in_file() {

Review Comment:
   Replacing the unit test with a more e2e tests that shows that things work as 
expected



##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -118,35 +114,25 @@ impl DatafusionArrowPredicate {
     /// Create a new `DatafusionArrowPredicate` from a `FilterCandidate`
     pub fn try_new(
         candidate: FilterCandidate,
-        schema: &Schema,
         metadata: &ParquetMetaData,
         rows_pruned: metrics::Count,
         rows_matched: metrics::Count,
         time: metrics::Time,
-        schema_mapping: Arc<dyn SchemaMapper>,
     ) -> Result<Self> {
-        let schema = Arc::new(schema.project(&candidate.projection)?);
-        let physical_expr = reassign_predicate_columns(candidate.expr, 
&schema, true)?;
-
-        // ArrowPredicate::evaluate is passed columns in the order they appear 
in the file
-        // If the predicate has multiple columns, we therefore must project 
the columns based
-        // on the order they appear in the file
-        let projection = match candidate.projection.len() {
-            0 | 1 => vec![],
-            2.. => remap_projection(&candidate.projection),
-        };

Review Comment:
   I think this is no longer necessary and is handled by the SchemaAdapter. 
Might be nice to have a test to point to to confirm.



##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -537,12 +464,20 @@ pub fn build_row_filter(
     // `a = 1 AND b = 2 AND c = 3` -> [`a = 1`, `b = 2`, `c = 3`]
     let predicates = split_conjunction(expr);
 
+    let file_schema = Arc::new(file_schema.clone());
+    let table_schema = Arc::new(table_schema.clone());

Review Comment:
   We could change the signature of `build_row_filter` since the caller might 
have an `Arc`'d version already, but since it's `pub` that would introduce more 
breaking changes and the clone seemed cheap enough. Open to doing that though.



##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -336,82 +338,40 @@ impl<'schema> PushdownChecker<'schema> {
     }
 }
 
-impl TreeNodeRewriter for PushdownChecker<'_> {
+impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
     type Node = Arc<dyn PhysicalExpr>;
 
-    fn f_down(
-        &mut self,
-        node: Arc<dyn PhysicalExpr>,
-    ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
+    fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
         if let Some(column) = node.as_any().downcast_ref::<Column>() {
             if let Some(recursion) = self.check_single_column(column.name()) {
-                return Ok(Transformed::new(node, false, recursion));
-            }
-        }
-
-        Ok(Transformed::no(node))
-    }
-
-    /// After visiting all children, rewrite column references to nulls if
-    /// they are not in the file schema.
-    /// We do this because they won't be relevant if they're not in the file 
schema, since that's
-    /// the only thing we're dealing with here as this is only used for the 
parquet pushdown during
-    /// scanning
-    fn f_up(
-        &mut self,
-        expr: Arc<dyn PhysicalExpr>,
-    ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {

Review Comment:
   While this would be more efficient than the new system for the case of 
missing columns since it avoids generating a column of nulls altogether the 
point is that it's not correct since it assumes that the default SchemaAdapter 
is being used but that's a pluggable trait.
   
   I do wonder if there's something that should be done to optimize for the 
default case. Not just here, but even more importantly at the stats level: even 
more efficient than pruning here would be to inject stats of `null_count = 
row_count` at the row group stats level which would prune much earlier and 
cheaper. That would rely on the same assumption though. Maybe 
https://github.com/apache/datafusion/issues/15220 can introduce an API to ask a 
SchemaAdapter for optional stats on columns it may generate?



##########
datafusion/core/src/datasource/mod.rs:
##########
@@ -276,14 +276,8 @@ mod tests {
         ]);
 
         let adapter = 
DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
-        let (mapper, indices) = adapter.map_schema(&file_schema).unwrap();
-        assert_eq!(indices, vec![0]);
-
-        let file_batch = record_batch!(("b", Float64, vec![1.0, 
2.0])).unwrap();
-
-        // Mapping fails because it tries to fill in a non-nullable column 
with nulls
-        let err = mapper.map_batch(file_batch).unwrap_err().to_string();
-        assert!(err.contains("Invalid argument error: Column 'a' is declared 
as non-nullable but contains null values"), "{err}");
+        let err = adapter.map_schema(&file_schema).unwrap_err().to_string();
+        assert!(err.contains("Error during planning: Column a is missing from 
the file schema, cannot be generated, and is non-nullable"));

Review Comment:
   This fails earlier, probably a good thing



##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -616,51 +547,19 @@ mod test {
         let expr = col("int64_list").is_not_null();
         let expr = logical2physical(&expr, &table_schema);
 
-        let candidate = FilterCandidateBuilder::new(expr, &table_schema, 
&table_schema)
-            .build(metadata)
-            .expect("building candidate");
-
-        assert!(candidate.is_none());
-    }
-
-    // If a column exists in the table schema but not the file schema it 
should be rewritten to a null expression
-    #[test]
-    fn test_filter_candidate_builder_rewrite_missing_column() {

Review Comment:
   See newly added e2e test



##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -156,12 +142,8 @@ impl ArrowPredicate for DatafusionArrowPredicate {
         &self.projection_mask
     }
 
-    fn evaluate(&mut self, mut batch: RecordBatch) -> 
ArrowResult<BooleanArray> {
-        if !self.projection.is_empty() {
-            batch = batch.project(&self.projection)?;
-        };
-
-        let batch = self.schema_mapping.map_partial_batch(batch)?;
+    fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
+        let batch = self.schema_mapping.map_batch(batch)?;

Review Comment:
   Here is where we ditch `map_partial_batch` in favor of `map_batch`



##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -118,35 +114,25 @@ impl DatafusionArrowPredicate {
     /// Create a new `DatafusionArrowPredicate` from a `FilterCandidate`
     pub fn try_new(
         candidate: FilterCandidate,
-        schema: &Schema,
         metadata: &ParquetMetaData,
         rows_pruned: metrics::Count,
         rows_matched: metrics::Count,
         time: metrics::Time,
-        schema_mapping: Arc<dyn SchemaMapper>,
     ) -> Result<Self> {
-        let schema = Arc::new(schema.project(&candidate.projection)?);
-        let physical_expr = reassign_predicate_columns(candidate.expr, 
&schema, true)?;
-
-        // ArrowPredicate::evaluate is passed columns in the order they appear 
in the file
-        // If the predicate has multiple columns, we therefore must project 
the columns based
-        // on the order they appear in the file
-        let projection = match candidate.projection.len() {
-            0 | 1 => vec![],
-            2.. => remap_projection(&candidate.projection),
-        };
+        let projected_schema = candidate.filter_schema.clone();
+        let physical_expr =
+            reassign_predicate_columns(candidate.expr, &projected_schema, 
true)?;
 
         Ok(Self {
             physical_expr,
-            projection,
             projection_mask: ProjectionMask::roots(
                 metadata.file_metadata().schema_descr(),
                 candidate.projection,
             ),
             rows_pruned,
             rows_matched,
             time,
-            schema_mapping,
+            schema_mapping: candidate.schema_mapper,

Review Comment:
   Rename to `schema_mapper`?



##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -194,9 +176,22 @@ impl ArrowPredicate for DatafusionArrowPredicate {
 /// See the module level documentation for more information.
 pub(crate) struct FilterCandidate {
     expr: Arc<dyn PhysicalExpr>,
+    /// Estimate for the total number of bytes that will need to be processed

Review Comment:
   Just adding some docs



##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -803,9 +689,10 @@ mod test {
     fn basic_expr_doesnt_prevent_pushdown() {
         let table_schema = get_basic_table_schema();
 
-        let file_schema = Schema::new(vec![Field::new("str_col", 
DataType::Utf8, true)]);
+        let file_schema =
+            Schema::new(vec![Field::new("string_col", DataType::Utf8, true)]);

Review Comment:
   There is no `str_col` in the data returned by `get_basic_table_schema()` but 
there is `string_col`.



##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -447,41 +406,12 @@ pub fn can_expr_be_pushed_down_with_schemas(
     can_be_pushed
 }
 
-/// Computes the projection required to go from the file's schema order to the 
projected
-/// order expected by this filter
-///
-/// Effectively this computes the rank of each element in `src`
-fn remap_projection(src: &[usize]) -> Vec<usize> {

Review Comment:
   I believe this is taken care of by SchemaAdapter now 😄. Again it would be 
nice to be able to point at a (maybe existing) test to confirm. Maybe I need to 
try removing this on `main` and confirming which tests break.



##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -682,42 +581,43 @@ mod test {
             false,
         )]);
 
-        let table_ref = Arc::new(table_schema.clone());
-        let schema_adapter =
-            DefaultSchemaAdapterFactory.create(Arc::clone(&table_ref), 
table_ref);
-        let (schema_mapping, _) = schema_adapter
-            .map_schema(&file_schema)
-            .expect("creating schema mapping");
-
-        let mut parquet_reader = 
parquet_reader_builder.build().expect("building reader");
-
-        // Parquet file is small, we only need 1 record batch
-        let first_rb = parquet_reader
-            .next()
-            .expect("expected record batch")
-            .expect("expected error free record batch");
-
         // Test all should fail
         let expr = col("timestamp_col").lt(Expr::Literal(
             ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
         ));
         let expr = logical2physical(&expr, &table_schema);
-        let candidate = FilterCandidateBuilder::new(expr, &file_schema, 
&table_schema)
-            .build(&metadata)
-            .expect("building candidate")
-            .expect("candidate expected");
+        let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
+        let table_schema = Arc::new(table_schema.clone());
+        let candidate = FilterCandidateBuilder::new(
+            expr,
+            file_schema.clone(),
+            table_schema.clone(),
+            schema_adapter_factory,
+        )
+        .build(&metadata)
+        .expect("building candidate")
+        .expect("candidate expected");
 
         let mut row_filter = DatafusionArrowPredicate::try_new(
             candidate,
-            &file_schema,
             &metadata,
             Count::new(),
             Count::new(),
             Time::new(),
-            Arc::clone(&schema_mapping),
         )
         .expect("creating filter predicate");
 
+        let mut parquet_reader = parquet_reader_builder
+            .with_projection(row_filter.projection().clone())

Review Comment:
   Moved down because it needs access to the row filter's projection



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to