alamb commented on code in PR #15801:
URL: https://github.com/apache/datafusion/pull/15801#discussion_r2070220686


##########
datafusion/core/tests/physical_optimizer/push_down_filter.rs:
##########
@@ -154,29 +153,25 @@ impl FileSource for TestSource {
 
     fn try_pushdown_filters(
         &self,
-        mut fd: FilterDescription,
+        filters: &[Arc<dyn PhysicalExpr>],

Review Comment:
   I would personally prefer keeping a struct as an argument as it
   1. Is easier to document / explain what the structures mean via comments
   2. it is clearer in the struct definition what types of operations (e.g. 
append only vs swap, etc) are done
   3. It is easier to add new fields if needed
   
   Basically,  something like
   
   ```rust
   pub struct FilterDescription {
           filters: &[Arc<dyn PhysicalExpr>],
   }
   ```
   ...
   
   This is a preference thing and i can easily see alternative opinions



##########
datafusion/core/tests/physical_optimizer/push_down_filter.rs:
##########
@@ -278,7 +273,7 @@ fn test_filter_collapse() {
         -     DataSourceExec: file_groups={0 groups: []}, projection=[a, b, 
c], file_type=test, pushdown_supported=true
       output:
         Ok:
-          - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo
+          - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar

Review Comment:
   This actually looks like an improvement to me as now `a = foo` will be 
evaluated before `b=bar` as was done in the input plan. This might be important 
for short circuiting, perhaps
   
   The prior version of this optimization seems to have reordered them



##########
datafusion/physical-plan/src/filter_pushdown.rs:
##########
@@ -17,79 +17,207 @@
 
 use std::sync::Arc;
 
-use crate::ExecutionPlan;
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 
-#[derive(Clone, Debug)]
-pub struct FilterDescription {
-    /// Expressions coming from the parent nodes
-    pub filters: Vec<Arc<dyn PhysicalExpr>>,
+/// The result of or a plan for pushing down a filter into a child node.
+/// This contains references to filters so that nodes can mutate a filter
+/// before pushing it down to a child node (e.g. to adjust a projection)
+/// or can directly take ownership of `Unsupported` filters that their children
+/// could not handle.
+#[derive(Debug, Clone)]
+pub enum FilterPushdown {
+    Supported(Arc<dyn PhysicalExpr>),
+    Unsupported(Arc<dyn PhysicalExpr>),
 }
 
-impl Default for FilterDescription {
-    fn default() -> Self {
-        Self::empty()
+/// A thin wrapper around [`FilterPushdown`]s that allows for easy collection 
of
+/// supported and unsupported filters.
+#[derive(Debug, Clone)]
+pub struct FilterPushdowns(Vec<FilterPushdown>);
+
+impl FilterPushdowns {
+    /// Create a new FilterPushdowns with the given filters and their pushdown 
status.
+    pub fn new(pushdowns: Vec<FilterPushdown>) -> Self {
+        Self(pushdowns)
+    }
+
+    /// Create a new FilterPushdowns with all filters as supported.
+    pub fn all_supported(filters: &[Arc<dyn PhysicalExpr>]) -> Self {
+        let pushdowns = filters
+            .iter()
+            .map(|f| FilterPushdown::Supported(Arc::clone(f)))
+            .collect();
+        Self::new(pushdowns)
+    }
+
+    /// Create a new FilterPushdowns with all filters as unsupported.
+    pub fn all_unsupported(filters: &[Arc<dyn PhysicalExpr>]) -> Self {
+        let pushdowns = filters
+            .iter()
+            .map(|f| FilterPushdown::Unsupported(Arc::clone(f)))
+            .collect();
+        Self::new(pushdowns)
+    }
+
+    /// Transform all filters to supported, returning a new FilterPushdowns.
+    /// This does not modify the original FilterPushdowns.
+    pub fn as_supported(&self) -> Self {
+        let pushdowns = self
+            .0
+            .iter()
+            .map(|f| match f {
+                FilterPushdown::Supported(expr) => {
+                    FilterPushdown::Supported(Arc::clone(expr))
+                }
+                FilterPushdown::Unsupported(expr) => {
+                    FilterPushdown::Supported(Arc::clone(expr))
+                }
+            })
+            .collect();
+        Self::new(pushdowns)
+    }
+
+    /// Collect unsupported filters into a Vec, without removing them from the 
original
+    /// FilterPushdowns.
+    pub fn collect_unsupported(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        self.0
+            .iter()
+            .filter_map(|f| match f {
+                FilterPushdown::Unsupported(expr) => Some(Arc::clone(expr)),
+                FilterPushdown::Supported(_) => None,
+            })
+            .collect()
+    }
+
+    /// Collect all filters as PhysicalExprs into a Vec, without removing them 
from the original
+    /// FilterPushdowns.
+    pub fn into_inner_filters(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        self.0
+            .iter()
+            .map(|f| match f {
+                FilterPushdown::Supported(expr) => Arc::clone(expr),
+                FilterPushdown::Unsupported(expr) => Arc::clone(expr),
+            })
+            .collect()
     }
-}
 
-impl FilterDescription {
-    /// Takes the filters out of the struct, leaving an empty vector in its 
place.
-    pub fn take_description(&mut self) -> Vec<Arc<dyn PhysicalExpr>> {
-        std::mem::take(&mut self.filters)
+    /// Return the inner `Vec<FilterPushdown>` without modifying the original 
FilterPushdowns.
+    pub fn into_inner(&self) -> Vec<FilterPushdown> {
+        self.0.clone()
     }
 
-    pub fn empty() -> FilterDescription {
-        Self { filters: vec![] }
+    /// Return an iterator over the inner `Vec<FilterPushdown>`.
+    pub fn iter(&self) -> impl Iterator<Item = &FilterPushdown> {
+        self.0.iter()
+    }
+
+    /// Return the number of filters in the inner `Vec<FilterPushdown>`.
+    pub fn len(&self) -> usize {
+        self.0.len()
+    }
+
+    /// Check if the inner `Vec<FilterPushdown>` is empty.
+    pub fn is_empty(&self) -> bool {
+        self.0.is_empty()
     }
 }
 
-#[derive(Debug)]
-pub enum FilterPushdownSupport<T> {
-    Supported {
-        // Filter predicates which can be pushed down through the operator.
-        // NOTE that these are not placed into any operator.
-        child_descriptions: Vec<FilterDescription>,
-        // Possibly updated new operator
-        op: T,
-        // Whether the node is removed from the plan and the rule should be 
re-run manually
-        // on the new node.
-        // TODO: If TreeNodeRecursion supports Revisit mechanism, this flag 
can be removed
-        revisit: bool,
-    },
-    NotSupported,
+/// The result of pushing down filters into a child node.
+/// This is the result provided to nodes in 
[`ExecutionPlan::handle_child_pushdown_result`].
+/// Nodes process this result and convert it into a 
[`FilterPushdownPropagation`]
+/// that is returned to their parent.
+///
+/// [`ExecutionPlan::handle_child_pushdown_result`]: 
crate::ExecutionPlan::handle_child_pushdown_result
+#[derive(Debug, Clone)]
+pub struct ChildPushdownResult {
+    /// The combined result of pushing down each parent filter into each child.
+    /// For example, given the fitlers `[a, b]` and children `[1, 2, 3]` the 
matrix of responses:
+    ///
+    // | filter | child 1     | child 2   | child 3   | result      |
+    // |--------|-------------|-----------|-----------|-------------|
+    // | a      | Supported   | Supported | Supported | Supported   |
+    // | b      | Unsupported | Supported | Supported | Unsupported |
+    ///
+    /// That is: if any child marks a filter as unsupported or if the filter 
was not pushed
+    /// down into any child then the result is unsupported.
+    /// If at least one children and all children that received the filter 
mark it as supported
+    /// then the result is supported.
+    pub parent_filters: FilterPushdowns,
+    /// The result of pushing down each filter this node provided into each of 
it's children.
+    /// This is not combined with the parent filters so that nodes can treat 
each child independently.
+    pub self_filters: Vec<FilterPushdowns>,
 }
 
-#[derive(Debug)]
-pub struct FilterPushdownResult<T> {
-    pub support: FilterPushdownSupport<T>,
-    // Filters which cannot be pushed down through the operator.
-    // NOTE that caller of try_pushdown_filters() should handle these 
remanining predicates,
-    // possibly introducing a FilterExec on top of this operator.
-    pub remaining_description: FilterDescription,
+/// The result of pushing down filters into a node that it returns to its 
parent.

Review Comment:
   this is great comments,



##########
datafusion/physical-optimizer/src/push_down_filter.rs:
##########
@@ -363,46 +362,139 @@ use datafusion_physical_plan::ExecutionPlan;
 /// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec
 #[derive(Debug)]
 pub struct PushdownFilter {}
-
 impl Default for PushdownFilter {
     fn default() -> Self {
         Self::new()
     }
 }
 
-pub type FilterDescriptionContext = PlanContext<FilterDescription>;
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum FilterPushdownState {
+    NoChildren,
+    Unsupported,
+    Supported,
+}
+
+fn push_down_filters(
+    node: &Arc<dyn ExecutionPlan>,
+    parent_filters: Vec<Arc<dyn PhysicalExpr>>,
+    config: &ConfigOptions,
+) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
+    let pushdown_plan = node.gather_filters_for_pushdown(&parent_filters, 
config)?;
+    let children = node
+        .children()
+        .into_iter()
+        .map(Arc::clone)
+        .collect::<Vec<_>>();
+    let mut parent_pushdown_result =

Review Comment:
   one idea to make this code easier to follow might be to break it up into 
multiple functions. Maybe even a struct or something that held the key state
   
   ```rust
   pub struct SingleNodePushdown {
     pushdown_plan: ....
     new_children: Vec<>
   }
   ...
   ```



##########
datafusion/physical-optimizer/src/push_down_filter.rs:
##########
@@ -363,46 +362,139 @@ use datafusion_physical_plan::ExecutionPlan;
 /// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec
 #[derive(Debug)]
 pub struct PushdownFilter {}
-
 impl Default for PushdownFilter {
     fn default() -> Self {
         Self::new()
     }
 }
 
-pub type FilterDescriptionContext = PlanContext<FilterDescription>;
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum FilterPushdownState {

Review Comment:
   it would be nice to add some comments here 



##########
datafusion/core/tests/physical_optimizer/push_down_filter.rs:
##########
@@ -154,29 +153,25 @@ impl FileSource for TestSource {
 
     fn try_pushdown_filters(
         &self,
-        mut fd: FilterDescription,
+        filters: &[Arc<dyn PhysicalExpr>],
         config: &ConfigOptions,
-    ) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> {
+    ) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
+        let mut all_filters = 
filters.iter().map(Arc::clone).collect::<Vec<_>>();

Review Comment:
   For example, if this was in a struct, we could implement `clone()` for the 
struct and hide this complexity



##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -471,39 +471,53 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
         Ok(None)
     }
 
-    /// Attempts to recursively push given filters from the top of the tree 
into leafs.
-    ///
-    /// This is used for various optimizations, such as:
-    ///
-    /// * Pushing down filters into scans in general to minimize the amount of 
data that needs to be materialzied.
-    /// * Pushing down dynamic filters from operators like TopK and Joins into 
scans.
-    ///
-    /// Generally the further down (closer to leaf nodes) that filters can be 
pushed, the better.
-    ///
-    /// Consider the case of a query such as `SELECT * FROM t WHERE a = 1 AND 
b = 2`.
-    /// With no filter pushdown the scan needs to read and materialize all the 
data from `t` and then filter based on `a` and `b`.
-    /// With filter pushdown into the scan it can first read only `a`, then 
`b` and keep track of
-    /// which rows match the filter.
-    /// Then only for rows that match the filter does it have to materialize 
the rest of the columns.
-    ///
-    /// # Default Implementation
-    ///
-    /// The default implementation assumes:
-    /// * Parent filters can't be passed onto children.
-    /// * This node has no filters to contribute.
-    ///
-    /// # Implementation Notes
-    ///
-    /// Most of the actual logic is implemented as a Physical Optimizer rule.
-    /// See [`PushdownFilter`] for more details.
-    ///
-    /// [`PushdownFilter`]: 
https://docs.rs/datafusion/latest/datafusion/physical_optimizer/filter_pushdown/struct.PushdownFilter.html
-    fn try_pushdown_filters(

Review Comment:
   to be honest, I prefer the previous implementation with just a single method 
on ExectionPlan -- I think having two methods whose implementation must remain 
synchronized is more complicated to reason about and likely be harder to work 
with



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to