adriangb commented on code in PR #15263: URL: https://github.com/apache/datafusion/pull/15263#discussion_r1997813569
########## datafusion/core/src/datasource/physical_plan/parquet.rs: ########## @@ -224,6 +224,64 @@ mod tests { ) } + #[tokio::test] + async fn test_pushdown_with_missing_column_in_file() { Review Comment: Replacing the unit test with a more e2e tests that shows that things work as expected ########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -118,35 +114,25 @@ impl DatafusionArrowPredicate { /// Create a new `DatafusionArrowPredicate` from a `FilterCandidate` pub fn try_new( candidate: FilterCandidate, - schema: &Schema, metadata: &ParquetMetaData, rows_pruned: metrics::Count, rows_matched: metrics::Count, time: metrics::Time, - schema_mapping: Arc<dyn SchemaMapper>, ) -> Result<Self> { - let schema = Arc::new(schema.project(&candidate.projection)?); - let physical_expr = reassign_predicate_columns(candidate.expr, &schema, true)?; - - // ArrowPredicate::evaluate is passed columns in the order they appear in the file - // If the predicate has multiple columns, we therefore must project the columns based - // on the order they appear in the file - let projection = match candidate.projection.len() { - 0 | 1 => vec![], - 2.. => remap_projection(&candidate.projection), - }; Review Comment: I think this is no longer necessary and is handled by the SchemaAdapter. Might be nice to have a test to point to to confirm. ########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -537,12 +464,20 @@ pub fn build_row_filter( // `a = 1 AND b = 2 AND c = 3` -> [`a = 1`, `b = 2`, `c = 3`] let predicates = split_conjunction(expr); + let file_schema = Arc::new(file_schema.clone()); + let table_schema = Arc::new(table_schema.clone()); Review Comment: We could change the signature of `build_row_filter` since the caller might have an `Arc`'d version already, but since it's `pub` that would introduce more breaking changes and the clone seemed cheap enough. Open to doing that though. ########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -336,82 +338,40 @@ impl<'schema> PushdownChecker<'schema> { } } -impl TreeNodeRewriter for PushdownChecker<'_> { +impl TreeNodeVisitor<'_> for PushdownChecker<'_> { type Node = Arc<dyn PhysicalExpr>; - fn f_down( - &mut self, - node: Arc<dyn PhysicalExpr>, - ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> { + fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> { if let Some(column) = node.as_any().downcast_ref::<Column>() { if let Some(recursion) = self.check_single_column(column.name()) { - return Ok(Transformed::new(node, false, recursion)); - } - } - - Ok(Transformed::no(node)) - } - - /// After visiting all children, rewrite column references to nulls if - /// they are not in the file schema. - /// We do this because they won't be relevant if they're not in the file schema, since that's - /// the only thing we're dealing with here as this is only used for the parquet pushdown during - /// scanning - fn f_up( - &mut self, - expr: Arc<dyn PhysicalExpr>, - ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> { Review Comment: While this would be more efficient than the new system for the case of missing columns since it avoids generating a column of nulls altogether the point is that it's not correct since it assumes that the default SchemaAdapter is being used but that's a pluggable trait. I do wonder if there's something that should be done to optimize for the default case. Not just here, but even more importantly at the stats level: even more efficient than pruning here would be to inject stats of `null_count = row_count` at the row group stats level which would prune much earlier and cheaper. That would rely on the same assumption though. Maybe https://github.com/apache/datafusion/issues/15220 can introduce an API to ask a SchemaAdapter for optional stats on columns it may generate? ########## datafusion/core/src/datasource/mod.rs: ########## @@ -276,14 +276,8 @@ mod tests { ]); let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema)); - let (mapper, indices) = adapter.map_schema(&file_schema).unwrap(); - assert_eq!(indices, vec![0]); - - let file_batch = record_batch!(("b", Float64, vec![1.0, 2.0])).unwrap(); - - // Mapping fails because it tries to fill in a non-nullable column with nulls - let err = mapper.map_batch(file_batch).unwrap_err().to_string(); - assert!(err.contains("Invalid argument error: Column 'a' is declared as non-nullable but contains null values"), "{err}"); + let err = adapter.map_schema(&file_schema).unwrap_err().to_string(); + assert!(err.contains("Error during planning: Column a is missing from the file schema, cannot be generated, and is non-nullable")); Review Comment: This fails earlier, probably a good thing ########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -616,51 +547,19 @@ mod test { let expr = col("int64_list").is_not_null(); let expr = logical2physical(&expr, &table_schema); - let candidate = FilterCandidateBuilder::new(expr, &table_schema, &table_schema) - .build(metadata) - .expect("building candidate"); - - assert!(candidate.is_none()); - } - - // If a column exists in the table schema but not the file schema it should be rewritten to a null expression - #[test] - fn test_filter_candidate_builder_rewrite_missing_column() { Review Comment: See newly added e2e test ########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -156,12 +142,8 @@ impl ArrowPredicate for DatafusionArrowPredicate { &self.projection_mask } - fn evaluate(&mut self, mut batch: RecordBatch) -> ArrowResult<BooleanArray> { - if !self.projection.is_empty() { - batch = batch.project(&self.projection)?; - }; - - let batch = self.schema_mapping.map_partial_batch(batch)?; + fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> { + let batch = self.schema_mapping.map_batch(batch)?; Review Comment: Here is where we ditch `map_partial_batch` in favor of `map_batch` ########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -118,35 +114,25 @@ impl DatafusionArrowPredicate { /// Create a new `DatafusionArrowPredicate` from a `FilterCandidate` pub fn try_new( candidate: FilterCandidate, - schema: &Schema, metadata: &ParquetMetaData, rows_pruned: metrics::Count, rows_matched: metrics::Count, time: metrics::Time, - schema_mapping: Arc<dyn SchemaMapper>, ) -> Result<Self> { - let schema = Arc::new(schema.project(&candidate.projection)?); - let physical_expr = reassign_predicate_columns(candidate.expr, &schema, true)?; - - // ArrowPredicate::evaluate is passed columns in the order they appear in the file - // If the predicate has multiple columns, we therefore must project the columns based - // on the order they appear in the file - let projection = match candidate.projection.len() { - 0 | 1 => vec![], - 2.. => remap_projection(&candidate.projection), - }; + let projected_schema = candidate.filter_schema.clone(); + let physical_expr = + reassign_predicate_columns(candidate.expr, &projected_schema, true)?; Ok(Self { physical_expr, - projection, projection_mask: ProjectionMask::roots( metadata.file_metadata().schema_descr(), candidate.projection, ), rows_pruned, rows_matched, time, - schema_mapping, + schema_mapping: candidate.schema_mapper, Review Comment: Rename to `schema_mapper`? ########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -194,9 +176,22 @@ impl ArrowPredicate for DatafusionArrowPredicate { /// See the module level documentation for more information. pub(crate) struct FilterCandidate { expr: Arc<dyn PhysicalExpr>, + /// Estimate for the total number of bytes that will need to be processed Review Comment: Just adding some docs ########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -803,9 +689,10 @@ mod test { fn basic_expr_doesnt_prevent_pushdown() { let table_schema = get_basic_table_schema(); - let file_schema = Schema::new(vec![Field::new("str_col", DataType::Utf8, true)]); + let file_schema = + Schema::new(vec![Field::new("string_col", DataType::Utf8, true)]); Review Comment: There is no `str_col` in the data returned by `get_basic_table_schema()` but there is `string_col`. ########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -447,41 +406,12 @@ pub fn can_expr_be_pushed_down_with_schemas( can_be_pushed } -/// Computes the projection required to go from the file's schema order to the projected -/// order expected by this filter -/// -/// Effectively this computes the rank of each element in `src` -fn remap_projection(src: &[usize]) -> Vec<usize> { Review Comment: I believe this is taken care of by SchemaAdapter now 😄. Again it would be nice to be able to point at a (maybe existing) test to confirm. Maybe I need to try removing this on `main` and confirming which tests break. ########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -682,42 +581,43 @@ mod test { false, )]); - let table_ref = Arc::new(table_schema.clone()); - let schema_adapter = - DefaultSchemaAdapterFactory.create(Arc::clone(&table_ref), table_ref); - let (schema_mapping, _) = schema_adapter - .map_schema(&file_schema) - .expect("creating schema mapping"); - - let mut parquet_reader = parquet_reader_builder.build().expect("building reader"); - - // Parquet file is small, we only need 1 record batch - let first_rb = parquet_reader - .next() - .expect("expected record batch") - .expect("expected error free record batch"); - // Test all should fail let expr = col("timestamp_col").lt(Expr::Literal( ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))), )); let expr = logical2physical(&expr, &table_schema); - let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema) - .build(&metadata) - .expect("building candidate") - .expect("candidate expected"); + let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory); + let table_schema = Arc::new(table_schema.clone()); + let candidate = FilterCandidateBuilder::new( + expr, + file_schema.clone(), + table_schema.clone(), + schema_adapter_factory, + ) + .build(&metadata) + .expect("building candidate") + .expect("candidate expected"); let mut row_filter = DatafusionArrowPredicate::try_new( candidate, - &file_schema, &metadata, Count::new(), Count::new(), Time::new(), - Arc::clone(&schema_mapping), ) .expect("creating filter predicate"); + let mut parquet_reader = parquet_reader_builder + .with_projection(row_filter.projection().clone()) Review Comment: Moved down because it needs access to the row filter's projection -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org