discord9 commented on code in PR #19404:
URL: https://github.com/apache/datafusion/pull/19404#discussion_r2703049349


##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -347,10 +371,30 @@ impl ExecutionPlan for ProjectionExec {
         parent_filters: Vec<Arc<dyn PhysicalExpr>>,
         _config: &ConfigOptions,
     ) -> Result<FilterDescription> {
-        // TODO: In future, we can try to handle inverting aliases here.
-        // For the time being, we pass through untransformed filters, so 
filters on aliases are not handled.
-        // https://github.com/apache/datafusion/issues/17246
-        FilterDescription::from_children(parent_filters, &self.children())
+        // expand alias column to original expr in parent filters
+        let invert_alias_map = self.collect_reverse_alias()?;
+
+        let mut child_parent_filters = 
Vec::with_capacity(parent_filters.len());
+
+        for filter in parent_filters {
+            let mut rewriter = PhysicalColumnRewriter::new(&invert_alias_map);
+            let rewritten = Arc::clone(&filter).rewrite(&mut rewriter)?.data;
+
+            if rewriter.has_unmapped_columns() {
+                // Filter contains columns that cannot be mapped through 
projection
+                // Mark as unsupported - cannot push down
+                
child_parent_filters.push(PushedDownPredicate::unsupported(filter));

Review Comment:
   > Previously `FilterDescription::from_children(parent_filters, 
&self.children())` handled this by calling `ChildFilterDescription::from_child`.
   > 
   > I think if we're going to have this check it should not be a part of 
`PhysicalColumnRewriter`, that's what is smelling weird to me I think.
   > 
   > How about we extract the logic in `ChildFilterDescription::from_child` 
into a standalone function such as:
   > 
   > ```diff
   > diff --git a/datafusion/physical-plan/src/filter_pushdown.rs 
b/datafusion/physical-plan/src/filter_pushdown.rs
   > index 1274e954e..85c220756 100644
   > --- a/datafusion/physical-plan/src/filter_pushdown.rs
   > +++ b/datafusion/physical-plan/src/filter_pushdown.rs
   > @@ -37,8 +37,9 @@
   >  use std::collections::HashSet;
   >  use std::sync::Arc;
   >  
   > -use datafusion_common::Result;
   > -use datafusion_physical_expr::utils::{collect_columns, 
reassign_expr_columns};
   > +use arrow_schema::Schema;
   > +use datafusion_common::{Result, tree_node::{TreeNode, TreeNodeRecursion}};
   > +use datafusion_physical_expr::{expressions::Column, 
utils::reassign_expr_columns};
   >  use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
   >  use itertools::Itertools;
   >  
   > @@ -306,6 +307,53 @@ pub struct ChildFilterDescription {
   >      pub(crate) self_filters: Vec<Arc<dyn PhysicalExpr>>,
   >  }
   >  
   > +/// A utility for checking whether a filter expression can be pushed down
   > +/// to a child node based on column availability.
   > +///
   > +/// This checker validates that all columns referenced in a filter 
expression
   > +/// exist in the target schema. If any column in the filter is not present
   > +/// in the schema, the filter cannot be pushed down to that child.
   > +pub struct FilterColumnChecker<'a> {
   > +    column_names: HashSet<&'a str>,
   > +}
   > +
   > +impl<'a> FilterColumnChecker<'a> {
   > +    /// Creates a new [`FilterColumnChecker`] from the given schema.
   > +    ///
   > +    /// Extracts all column names from the schema's fields to build
   > +    /// a lookup set for efficient column existence checks.
   > +    fn new(input_schema: &'a Schema) -> Self {
   > +        let column_names: HashSet<&str> = input_schema
   > +            .fields()
   > +            .iter()
   > +            .map(|f| f.name().as_str())
   > +            .collect();
   > +        Self { column_names }
   > +    }
   > +
   > +    /// Checks whether a filter expression can be pushed down to the child
   > +    /// whose schema was used to create this checker.
   > +    ///
   > +    /// Returns `true` if all [`Column`] references in the filter 
expression
   > +    /// exist in the target schema, `false` otherwise.
   > +    ///
   > +    /// This method traverses the entire expression tree, checking each
   > +    /// column reference against the available column names.
   > +    fn can_pushdown(&self, filter: &Arc<dyn PhysicalExpr>) -> bool {
   > +        let mut can_apply = true;
   > +        filter.apply(|expr| {
   > +            if let Some(column) = expr.as_any().downcast_ref::<Column>() {
   > +                if !self.column_names.contains(column.name()) {
   > +                    can_apply = false;
   > +                    return Ok(TreeNodeRecursion::Stop);
   > +                }
   > +            }
   > +            Ok(TreeNodeRecursion::Continue)
   > +        }).expect("infallible traversal");
   > +        can_apply
   > +    }
   > +}
   > +
   >  impl ChildFilterDescription {
   >      /// Build a child filter description by analyzing which parent 
filters can be pushed to a specific child.
   >      ///
   > @@ -320,26 +368,14 @@ impl ChildFilterDescription {
   >      ) -> Result<Self> {
   >          let child_schema = child.schema();
   >  
   > -        // Get column names from child schema for quick lookup
   > -        let child_column_names: HashSet<&str> = child_schema
   > -            .fields()
   > -            .iter()
   > -            .map(|f| f.name().as_str())
   > -            .collect();
   > +        // Build a set of column names in the child schema for quick 
lookup
   > +        let checker = FilterColumnChecker::new(&child_schema);
   >  
   >          // Analyze each parent filter
   >          let mut child_parent_filters = 
Vec::with_capacity(parent_filters.len());
   >  
   >          for filter in parent_filters {
   > -            // Check which columns the filter references
   > -            let referenced_columns = collect_columns(filter);
   > -
   > -            // Check if all referenced columns exist in the child schema
   > -            let all_columns_exist = referenced_columns
   > -                .iter()
   > -                .all(|col| child_column_names.contains(col.name()));
   > -
   > -            if all_columns_exist {
   > +            if checker.can_pushdown(filter) {
   >                  // All columns exist in child - we can push down
   >                  // Need to reassign column indices to match child schema
   >                  let reassigned_filter =
   > ```
   > 
   > Then we can call that standalone function here.
   
   sound much better than my design, let me impl it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to