adriangb commented on code in PR #15801:
URL: https://github.com/apache/datafusion/pull/15801#discussion_r2054603061


##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -438,55 +440,112 @@ impl ExecutionPlan for FilterExec {
         try_embed_projection(projection, self)
     }
 
-    fn try_pushdown_filters(
+    fn gather_filters_for_pushdown(
         &self,
-        mut fd: FilterDescription,
+        parent_filters: &[Arc<dyn PhysicalExpr>],
         _config: &ConfigOptions,
-    ) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
-        // Extend the filter descriptions
-        fd.filters.push(Arc::clone(&self.predicate));
-
-        // Extract the information
-        let child_descriptions = vec![fd];
-        let remaining_description = FilterDescription { filters: vec![] };
-        let filter_input = Arc::clone(self.input());
-
+    ) -> Result<FilterPushdownPlan> {
         if let Some(projection_indices) = self.projection.as_ref() {
-            // Push the filters down, but leave a ProjectionExec behind, 
instead of the FilterExec
-            let filter_child_schema = filter_input.schema();
-            let proj_exprs = projection_indices
+            // We need to invert the projection on any referenced columns in 
the filter
+            // Create a mapping from the output columns to the input columns 
(the inverse of the projection)
+            let inverse_projection = projection_indices
                 .iter()
-                .map(|p| {
-                    let field = filter_child_schema.field(*p).clone();
-                    (
-                        Arc::new(Column::new(field.name(), *p)) as Arc<dyn 
PhysicalExpr>,
-                        field.name().to_string(),
-                    )
+                .enumerate()
+                .map(|(i, &p)| (i, p))
+                .collect::<HashMap<_, _>>();
+            let predicate = Arc::clone(&self.predicate)
+                .transform_up(|expr| {
+                    if let Some(col) = expr.as_any().downcast_ref::<Column>() {
+                        let index = col.index();
+                        let index_in_input_schema =
+                            inverse_projection.get(&index).ok_or_else(|| {
+                                DataFusionError::Internal(format!(
+                                    "Column {} not found in projection",
+                                    index
+                                ))
+                            })?;
+                        Ok(Transformed::yes(Arc::new(Column::new(
+                            col.name(),
+                            *index_in_input_schema,
+                        )) as _))
+                    } else {
+                        Ok(Transformed::no(expr))
+                    }

Review Comment:
   There's a bug with this inverting the projection logic that I haven't been 
able to spot... will continue later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to