zhuqi-lucas commented on code in PR #19545: URL: https://github.com/apache/datafusion/pull/19545#discussion_r2665009502
########## datafusion/datasource-parquet/src/supported_predicates.rs: ########## @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Registry of physical expressions that support nested list column pushdown +//! to the Parquet decoder. +//! +//! This module provides a trait-based approach for determining which predicates +//! can be safely evaluated on nested list columns during Parquet decoding. + +use std::sync::Arc; + +use datafusion_physical_expr::expressions::{IsNotNullExpr, IsNullExpr}; +use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr}; + +// `ScalarUDFExpr` is currently an alias of `ScalarFunctionExpr` in this crate, +// but keep a separate type to support potential future divergence. +type ScalarUDFExpr = ScalarFunctionExpr; + +/// Trait for physical expressions that support list column pushdown during +/// Parquet decoding. +/// +/// This trait provides a type-safe mechanism for identifying expressions that +/// can be safely pushed down to the Parquet decoder for evaluation on nested +/// list columns. +/// +/// # Implementation Notes +/// +/// Expression types in external crates cannot directly implement this trait +/// due to Rust's orphan rules. Instead, we use a blanket implementation that +/// delegates to a registration mechanism. +/// +/// # Examples +/// +/// ```ignore +/// use datafusion_physical_expr::PhysicalExpr; +/// use datafusion_datasource_parquet::SupportsListPushdown; +/// +/// let expr: Arc<dyn PhysicalExpr> = ...; +/// if expr.supports_list_pushdown() { +/// // Can safely push down to Parquet decoder +/// } +/// ``` +pub trait SupportsListPushdown { + /// Returns `true` if this expression supports list column pushdown. + fn supports_list_pushdown(&self) -> bool; +} + +/// Blanket implementation for all physical expressions. +/// +/// This delegates to specialized predicates that check whether the concrete +/// expression type is registered as supporting list pushdown. This design +/// allows the trait to work with expression types defined in external crates. +impl SupportsListPushdown for dyn PhysicalExpr { + fn supports_list_pushdown(&self) -> bool { + is_null_check(self) || is_supported_scalar_function(self) + } +} + +/// Checks if an expression is a NULL or NOT NULL check. +/// +/// These checks are universally supported for all column types. +fn is_null_check(expr: &dyn PhysicalExpr) -> bool { + expr.as_any().downcast_ref::<IsNullExpr>().is_some() + || expr.as_any().downcast_ref::<IsNotNullExpr>().is_some() +} + +/// Checks if an expression is a scalar function registered for list pushdown. +/// +/// Returns `true` if the expression is a `ScalarFunctionExpr` whose function +/// is in the registry of supported operations. +fn is_supported_scalar_function(expr: &dyn PhysicalExpr) -> bool { + scalar_function_name(expr).is_some_and(|name| { + // Registry of verified array functions + matches!(name, "array_has" | "array_has_all" | "array_has_any") + }) +} + +fn scalar_function_name(expr: &dyn PhysicalExpr) -> Option<&str> { + expr.as_any() + .downcast_ref::<ScalarFunctionExpr>() + .map(ScalarFunctionExpr::name) + .or_else(|| { + expr.as_any() + .downcast_ref::<ScalarUDFExpr>() + .map(ScalarUDFExpr::name) + }) +} Review Comment: Why not just use ScalarFunctionExpr? ```suggestion fn scalar_function_name(expr: &dyn PhysicalExpr) -> Option<&str> { expr.as_any() .downcast_ref::<ScalarFunctionExpr>() .map(ScalarFunctionExpr::name) } ``` ########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -247,34 +273,78 @@ struct PushdownChecker<'schema> { projected_columns: bool, /// Indices into the file schema of columns required to evaluate the expression. required_columns: BTreeSet<usize>, Review Comment: Not related to this PR, if we can optimize to required_columns: Vec<usize> for small column set? ########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -297,34 +367,137 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> { } } +/// Describes the nested column behavior for filter pushdown. +/// +/// This enum makes explicit the different states a predicate can be in +/// with respect to nested column handling during Parquet decoding. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum NestedColumnSupport { + /// Expression references only primitive (non-nested) columns. + /// These can always be pushed down to the Parquet decoder. + PrimitiveOnly, + /// Expression references list columns with supported predicates + /// (e.g., array_has, array_has_all, IS NULL). + /// These can be pushed down to the Parquet decoder. + ListsSupported, + /// Expression references unsupported nested types (e.g., structs) + /// or list columns without supported predicates. + /// These cannot be pushed down and must be evaluated after decoding. + Unsupported, +} + +#[derive(Debug)] +struct PushdownColumns { + required_columns: BTreeSet<usize>, + nested: NestedColumnSupport, +} + /// Checks if a given expression can be pushed down to the parquet decoder. /// -/// Returns `Some(column_indices)` if the expression can be pushed down, -/// where `column_indices` are the indices into the file schema of all columns +/// Returns `Some(PushdownColumns)` if the expression can be pushed down, +/// where the struct contains the indices into the file schema of all columns /// required to evaluate the expression. /// /// Returns `None` if the expression cannot be pushed down (e.g., references -/// non-primitive types or columns not in the file). +/// unsupported nested types or columns not in the file). fn pushdown_columns( expr: &Arc<dyn PhysicalExpr>, file_schema: &Schema, -) -> Result<Option<Vec<usize>>> { - let mut checker = PushdownChecker::new(file_schema); +) -> Result<Option<PushdownColumns>> { + let allow_list_columns = supports_list_predicates(expr); + let mut checker = PushdownChecker::new(file_schema, allow_list_columns); expr.visit(&mut checker)?; - Ok((!checker.prevents_pushdown()) - .then_some(checker.required_columns.into_iter().collect())) + Ok((!checker.prevents_pushdown()).then_some(PushdownColumns { + required_columns: checker.required_columns, + nested: checker.nested_behavior, + })) +} + +fn leaf_indices_for_roots( + root_indices: &[usize], + schema_descr: &SchemaDescriptor, + nested: NestedColumnSupport, +) -> Vec<usize> { + // For primitive-only columns, root indices ARE the leaf indices + if nested == NestedColumnSupport::PrimitiveOnly { + return root_indices.to_vec(); + } + + // For nested columns (lists or structs), we need to expand to all leaf columns Review Comment: It seems list columns, only single leaf item field, correct me if i am wrong. ```suggestion // For List columns, expand to the single leaf column (item field) // For Struct columns (unsupported), this would expand to multiple leaves ``` ########## datafusion/datasource-parquet/src/supported_predicates.rs: ########## @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Registry of physical expressions that support nested list column pushdown +//! to the Parquet decoder. +//! +//! This module provides a trait-based approach for determining which predicates +//! can be safely evaluated on nested list columns during Parquet decoding. + +use std::sync::Arc; + +use datafusion_physical_expr::expressions::{IsNotNullExpr, IsNullExpr}; +use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr}; + +// `ScalarUDFExpr` is currently an alias of `ScalarFunctionExpr` in this crate, +// but keep a separate type to support potential future divergence. +type ScalarUDFExpr = ScalarFunctionExpr; Review Comment: I can't see a difference here, why not just use ScalarFunctionExpr? ########## datafusion/datasource-parquet/src/row_filter.rs: ########## @@ -177,12 +189,19 @@ pub(crate) struct FilterCandidate { /// Can this filter use an index (e.g. a page index) to prune rows? can_use_index: bool, /// Column indices into the parquet file schema required to evaluate this filter. - projection: Vec<usize>, + projection: LeafProjection, /// The Arrow schema containing only the columns required by this filter, /// projected from the file's Arrow schema. filter_schema: SchemaRef, } +/// Tracks the projection of an expression in both root and leaf coordinates. Review Comment: ```suggestion /// Projection specification for nested columns using Parquet leaf column indices. /// /// For nested types like List and Struct, Parquet stores data in leaf columns /// (the primitive fields). This struct tracks which leaf columns are needed /// to evaluate a filter expression. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
