adriangb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2029972339


##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -467,6 +468,353 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
         Ok(None)
     }
+
+    /// A physical optimizer rule that pushes down filters in the execution 
plan.
+    /// For example, consider the following plan:
+    ///
+    /// ```text
+    /// ┌──────────────────────┐
+    /// │ CoalesceBatchesExec  │
+    /// └──────────────────────┘
+    ///             │
+    ///             ▼
+    /// ┌──────────────────────┐
+    /// │      FilterExec      │
+    /// │  filters = [ id=1]   │
+    /// └──────────────────────┘
+    ///             │
+    ///             ▼
+    /// ┌──────────────────────┐
+    /// │    DataSourceExec    │
+    /// │    projection = *    │
+    /// └──────────────────────┘
+    /// ```
+    ///
+    /// Our goal is to move the `id = 1` filter from the `FilterExec` node to 
the `DataSourceExec` node.
+    /// If this filter is selective it can avoid massive amounts of data being 
read from the source (the projection is `*` so all matching columns are read).
+    /// In this simple case we:
+    /// 1. Enter the recursion with no filters.
+    /// 2. We find the `FilterExec` node and it tells us that it has a filter 
(see [`ExecutionPlan::filters_for_pushdown`] and 
`datafusion::physical_plan::filter::FilterExec`).
+    /// 3. We recurse down into it's children (the `DataSourceExec` node) now 
carrying the filters `[id = 1]`.
+    /// 4. The `DataSourceExec` node tells us that it can handle the filter 
and we mark it as handled exact (see 
[`ExecutionPlan::with_filter_pushdown_result`]).
+    /// 5. Since the `DataSourceExec` node has no children we recurse back up 
the tree.
+    /// 6. We now tell the `FilterExec` node that it has a child that can 
handle the filter and we mark it as handled exact (see 
[`ExecutionPlan::with_filter_pushdown_result`]).
+    ///    The `FilterExec` node can now return a new execution plan, either a 
copy of itself without that filter or if has no work left to do it can even 
return the child node directly.
+    /// 7. We recurse back up to `CoalesceBatchesExec` and do nothing there 
since it had no filters to push down.
+    ///
+    /// The new plan looks like:
+    ///
+    /// ```text
+    /// ┌──────────────────────┐
+    /// │ CoalesceBatchesExec  │
+    /// └──────────────────────┘
+    ///           │
+    ///           ▼
+    /// ┌──────────────────────┐
+    /// │    DataSourceExec    │
+    //  │    projection = *    │
+    //  │   filters = [ id=1]  │
+    /// └──────────────────────┘
+    /// ```
+    ///
+    /// Let's consider a more complex example involving a `ProjectionExec` 
node in betweeen the `FilterExec` and `DataSourceExec` nodes that creates a new 
column that the filter depends on.
+    ///
+    /// ```text
+    // ┌──────────────────────┐
+    // │ CoalesceBatchesExec  │
+    // └──────────────────────┘
+    //             │
+    //             ▼
+    // ┌──────────────────────┐
+    // │      FilterExec      │
+    // │    filters =         │
+    // │     [cost>50,id=1]   │
+    // └──────────────────────┘
+    //             │
+    //             ▼
+    // ┌──────────────────────┐
+    // │    ProjectionExec    │
+    // │ cost = price * 1.2   │
+    // └──────────────────────┘
+    //             │
+    //             ▼
+    // ┌──────────────────────┐
+    // │    DataSourceExec    │
+    // │    projection = *    │
+    // └──────────────────────┘
+    /// ```
+    ///
+    /// We want to push down the filters [id=1] to the `DataSourceExec` node, 
but can't push down `cost>50` because it requires the `ProjectionExec` node to 
be executed first:
+    ///
+    /// ```text
+    // ┌──────────────────────┐
+    // │ CoalesceBatchesExec  │
+    // └──────────────────────┘
+    //             │
+    //             ▼
+    // ┌──────────────────────┐
+    // │      FilterExec      │
+    // │    filters =         │
+    // │     [cost>50]        │
+    // └──────────────────────┘
+    //             │
+    //             ▼
+    // ┌──────────────────────┐
+    // │    ProjectionExec    │
+    // │ cost = price * 1.2   │
+    // └──────────────────────┘
+    //             │
+    //             ▼
+    // ┌──────────────────────┐
+    // │    DataSourceExec    │
+    // │    projection = *    │
+    // │   filters = [ id=1]  │
+    // └──────────────────────┘
+    /// ```
+    ///
+    /// There are also cases where we may be able to push down filters within 
a subtree but not the entire tree.
+    /// A good exmaple of this is aggreagation nodes:
+    ///
+    /// ```text
+    /// ┌──────────────────────┐
+    /// │ ProjectionExec       │
+    /// │ projection = *       │
+    /// └──────────────────────┘
+    ///           │
+    ///           ▼
+    /// ┌──────────────────────┐
+    /// │ FilterExec           │
+    /// │ filters = [sum > 10] │
+    /// └──────────────────────┘
+    ///           │
+    ///           ▼
+    /// ┌───────────────────────┐
+    /// │     AggregateExec     │
+    /// │    group by = [id]    │
+    /// │    aggregate =        │
+    /// │      [sum(price)]     │
+    /// └───────────────────────┘
+    ///           │
+    ///           ▼
+    /// ┌──────────────────────┐
+    /// │ FilterExec           │
+    /// │ filters = [id=1]     │
+    /// └──────────────────────┘
+    ///          │
+    ///          ▼
+    /// ┌──────────────────────┐
+    /// │ DataSourceExec       │
+    /// │ projection = *       │
+    /// └──────────────────────┘
+    /// ```
+    ///
+    /// The transformation here is to push down the `[id=1]` filter to the 
`DataSourceExec` node:
+    ///
+    /// ```text
+    /// ┌──────────────────────┐
+    /// │ ProjectionExec       │
+    /// │ projection = *       │
+    /// └──────────────────────┘
+    ///           │
+    ///           ▼
+    /// ┌──────────────────────┐
+    /// │ FilterExec           │
+    /// │ filters = [sum > 10] │
+    /// └──────────────────────┘
+    ///           │
+    ///           ▼
+    /// ┌───────────────────────┐
+    /// │     AggregateExec     │
+    /// │    group by = [id]    │
+    /// │    aggregate =        │
+    /// │      [sum(price)]     │
+    /// └───────────────────────┘
+    ///           │
+    ///           ▼
+    /// ┌──────────────────────┐
+    /// │ DataSourceExec       │
+    /// │ projection = *       │
+    /// │ filters = [id=1]     │
+    /// └──────────────────────┘
+    /// ```
+    ///
+    /// The point here is that:
+    /// 1. We cannot push down `sum > 10` through the `AggregateExec` node 
into the `DataSourceExec` node.
+    ///    Any filters above the `AggregateExec` node are not pushed down.
+    ///    This is determined by calling 
[`ExecutionPlan::filter_pushdown_request`] on the `AggregateExec` node.
+    /// 2. We need to keep recursing into the tree so that we can discover the 
other `FilterExec` node and push down the [id=1] filter.
+    ///
+    /// It is also possible to push down filters through joins and from joins.
+    /// For example, a hash join where we build a hash table of the left side 
and probe the right side
+    /// (ignoring why we would choose this order, typically it depends on the 
size of each table, etc.).
+    ///
+    /// ```text
+    ///              ┌─────────────────────┐
+    ///              │     FilterExec      │
+    ///              │ filters =           │
+    ///              │  [d.size > 100]     │
+    ///              └─────────────────────┘
+    ///                         │
+    ///                         │
+    ///              ┌──────────▼──────────┐
+    ///              │                     │
+    ///              │    HashJoinExec     │
+    ///              │ [u.dept@hash(d.id)] │
+    ///              │                     │
+    ///              └─────────────────────┘
+    ///                         │
+    ///            ┌────────────┴────────────┐
+    /// ┌──────────▼──────────┐   ┌──────────▼──────────┐
+    /// │   DataSourceExec    │   │   DataSourceExec    │
+    /// │  alias [users as u] │   │  alias [dept as d]  │
+    /// │                     │   │                     │
+    /// └─────────────────────┘   └─────────────────────┘
+    /// ```
+    ///
+    /// There are two pushdowns we can do here:
+    /// 1. Push down the `d.size > 100` filter through the `HashJoinExec` node 
to the `DataSourceExec` node for the `departments` table.
+    /// 2. Push down the hash table state from the `HashJoinExec` node to the 
`DataSourceExec` node to avoid reading
+    ///    rows from teh `users` table that will be eliminated by the join.
+    ///    This can be done via a bloom filter or similar.
+    ///
+    /// ```text
+    ///              ┌─────────────────────┐
+    ///              │                     │
+    ///              │    HashJoinExec     │
+    ///              │ [u.dept@hash(d.id)] │
+    ///              │                     │
+    ///              └─────────────────────┘
+    ///                         │
+    ///            ┌────────────┴────────────┐
+    /// ┌──────────▼──────────┐   ┌──────────▼──────────┐
+    /// │   DataSourceExec    │   │   DataSourceExec    │
+    /// │  alias [users as u] │   │  alias [dept as d]  │
+    /// │ filters =           │   │  filters =          │
+    /// │   [depg@hash(d.id)] │   │    [ d.size > 100]  │
+    /// └─────────────────────┘   └─────────────────────┘
+    /// ```
+    ///
+    /// You may notice in this case that the filter is *dynamic*: the hash 
table is built
+    /// _after_ the `departments` table is read and at runtime.
+    /// We don't have a concrete `InList` filter or similar to push down at 
optimization time.
+    /// These sorts of dynamic filters are handled by building a specialized
+    /// [`PhysicalExpr`][datafusion_physical_expr::PhysicalExpr] that can be 
evaluated at runtime
+    /// and internally maintains a reference to the hash table or other state.
+    /// To make working with these sorts of dynamic filters more tractable we 
have the method `PhysicalExpr::snapshot`
+    /// (TODO: add reference after 
<https://github.com/apache/datafusion/pull/15568> is merged)
+    /// which attempts to simplify a dynamic filter into a "basic" non-dynamic 
filter.
+    /// For a join this could mean converting it to an `InList` filter or a 
min/max filter for example.
+    /// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details.
+    ///
+    /// Another form of dyanmic filter is pushing down the state of a `TopK` 
operator for queries like
+    /// `SELECT * FROM t ORDER BY id LIMIT 10`:
+    ///
+    /// ```text
+    /// ┌──────────────────────┐
+    /// │       TopK           │
+    /// │     limit = 10       │
+    /// │   order by = [id]    │
+    /// └──────────────────────┘
+    ///            │
+    ///            ▼
+    /// ┌──────────────────────┐
+    /// │    DataSourceExec    │
+    /// │    projection = *    │
+    /// └──────────────────────┘
+    /// ```
+    ///
+    /// We can avoid large amounts of data processing by transforming this 
into:
+    ///
+    /// ```text
+    /// ┌──────────────────────┐
+    /// │       TopK           │
+    /// │     limit = 10       │
+    /// │   order by = [id]    │
+    /// └──────────────────────┘
+    ///            │
+    ///            ▼
+    /// ┌──────────────────────┐
+    /// │    DataSourceExec    │
+    /// │    projection = *    │
+    /// │ filters =            │
+    /// │    [id < @ TopKHeap] │
+    /// └──────────────────────┘
+    /// ```
+    ///
+    /// Now as we fill our `TopK` heap we can push down the state of the heap 
to the `DataSourceExec` node
+    /// to avoid reading files / row groups / pages / rows that could not 
possibly be in the top 10.
+    /// This is implemented in 
datafusion/physical-plan/src/sorts/sort_filters.rs.
+    fn try_pushdown_filters(
+        &self,
+        plan: &Arc<dyn ExecutionPlan>,
+        parent_filters: &[PhysicalExprRef],
+    ) -> Result<ExecutionPlanFilterPushdownResult> {

Review Comment:
   Now this is the only added method to `ExecutionPlan`. There's a bit of 
weirdness with having to have `&self` _and_ `plan: Arc<dyn ExecutionPlan>` 
which are the same thing just one is arc'ed and the other isn't. `Arc<Self>` 
won't cut it because you can't call methods on it and for this to work I need a 
default implementation in `ExecutionPlan` that does the recursion (otherwise we 
force ourselves to immediately implement this method on all nodes otherwise we 
would not recurse into subtrees, see the test with `AggregateExec`).
   
   If you have a way to avoid this happy to do it but it also doesn't seem like 
a huge deal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to