adriangb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2029972339
########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -467,6 +468,353 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Result<Option<Arc<dyn ExecutionPlan>>> { Ok(None) } + + /// A physical optimizer rule that pushes down filters in the execution plan. + /// For example, consider the following plan: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ CoalesceBatchesExec │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = [ id=1] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// ``` + /// + /// Our goal is to move the `id = 1` filter from the `FilterExec` node to the `DataSourceExec` node. + /// If this filter is selective it can avoid massive amounts of data being read from the source (the projection is `*` so all matching columns are read). + /// In this simple case we: + /// 1. Enter the recursion with no filters. + /// 2. We find the `FilterExec` node and it tells us that it has a filter (see [`ExecutionPlan::filters_for_pushdown`] and `datafusion::physical_plan::filter::FilterExec`). + /// 3. We recurse down into it's children (the `DataSourceExec` node) now carrying the filters `[id = 1]`. + /// 4. The `DataSourceExec` node tells us that it can handle the filter and we mark it as handled exact (see [`ExecutionPlan::with_filter_pushdown_result`]). + /// 5. Since the `DataSourceExec` node has no children we recurse back up the tree. + /// 6. We now tell the `FilterExec` node that it has a child that can handle the filter and we mark it as handled exact (see [`ExecutionPlan::with_filter_pushdown_result`]). + /// The `FilterExec` node can now return a new execution plan, either a copy of itself without that filter or if has no work left to do it can even return the child node directly. + /// 7. We recurse back up to `CoalesceBatchesExec` and do nothing there since it had no filters to push down. + /// + /// The new plan looks like: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ CoalesceBatchesExec │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + // │ projection = * │ + // │ filters = [ id=1] │ + /// └──────────────────────┘ + /// ``` + /// + /// Let's consider a more complex example involving a `ProjectionExec` node in betweeen the `FilterExec` and `DataSourceExec` nodes that creates a new column that the filter depends on. + /// + /// ```text + // ┌──────────────────────┐ + // │ CoalesceBatchesExec │ + // └──────────────────────┘ + // │ + // ▼ + // ┌──────────────────────┐ + // │ FilterExec │ + // │ filters = │ + // │ [cost>50,id=1] │ + // └──────────────────────┘ + // │ + // ▼ + // ┌──────────────────────┐ + // │ ProjectionExec │ + // │ cost = price * 1.2 │ + // └──────────────────────┘ + // │ + // ▼ + // ┌──────────────────────┐ + // │ DataSourceExec │ + // │ projection = * │ + // └──────────────────────┘ + /// ``` + /// + /// We want to push down the filters [id=1] to the `DataSourceExec` node, but can't push down `cost>50` because it requires the `ProjectionExec` node to be executed first: + /// + /// ```text + // ┌──────────────────────┐ + // │ CoalesceBatchesExec │ + // └──────────────────────┘ + // │ + // ▼ + // ┌──────────────────────┐ + // │ FilterExec │ + // │ filters = │ + // │ [cost>50] │ + // └──────────────────────┘ + // │ + // ▼ + // ┌──────────────────────┐ + // │ ProjectionExec │ + // │ cost = price * 1.2 │ + // └──────────────────────┘ + // │ + // ▼ + // ┌──────────────────────┐ + // │ DataSourceExec │ + // │ projection = * │ + // │ filters = [ id=1] │ + // └──────────────────────┘ + /// ``` + /// + /// There are also cases where we may be able to push down filters within a subtree but not the entire tree. + /// A good exmaple of this is aggreagation nodes: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ ProjectionExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = [sum > 10] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌───────────────────────┐ + /// │ AggregateExec │ + /// │ group by = [id] │ + /// │ aggregate = │ + /// │ [sum(price)] │ + /// └───────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = [id=1] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// ``` + /// + /// The transformation here is to push down the `[id=1]` filter to the `DataSourceExec` node: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ ProjectionExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ FilterExec │ + /// │ filters = [sum > 10] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌───────────────────────┐ + /// │ AggregateExec │ + /// │ group by = [id] │ + /// │ aggregate = │ + /// │ [sum(price)] │ + /// └───────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// │ filters = [id=1] │ + /// └──────────────────────┘ + /// ``` + /// + /// The point here is that: + /// 1. We cannot push down `sum > 10` through the `AggregateExec` node into the `DataSourceExec` node. + /// Any filters above the `AggregateExec` node are not pushed down. + /// This is determined by calling [`ExecutionPlan::filter_pushdown_request`] on the `AggregateExec` node. + /// 2. We need to keep recursing into the tree so that we can discover the other `FilterExec` node and push down the [id=1] filter. + /// + /// It is also possible to push down filters through joins and from joins. + /// For example, a hash join where we build a hash table of the left side and probe the right side + /// (ignoring why we would choose this order, typically it depends on the size of each table, etc.). + /// + /// ```text + /// ┌─────────────────────┐ + /// │ FilterExec │ + /// │ filters = │ + /// │ [d.size > 100] │ + /// └─────────────────────┘ + /// │ + /// │ + /// ┌──────────▼──────────┐ + /// │ │ + /// │ HashJoinExec │ + /// │ [u.dept@hash(d.id)] │ + /// │ │ + /// └─────────────────────┘ + /// │ + /// ┌────────────┴────────────┐ + /// ┌──────────▼──────────┐ ┌──────────▼──────────┐ + /// │ DataSourceExec │ │ DataSourceExec │ + /// │ alias [users as u] │ │ alias [dept as d] │ + /// │ │ │ │ + /// └─────────────────────┘ └─────────────────────┘ + /// ``` + /// + /// There are two pushdowns we can do here: + /// 1. Push down the `d.size > 100` filter through the `HashJoinExec` node to the `DataSourceExec` node for the `departments` table. + /// 2. Push down the hash table state from the `HashJoinExec` node to the `DataSourceExec` node to avoid reading + /// rows from teh `users` table that will be eliminated by the join. + /// This can be done via a bloom filter or similar. + /// + /// ```text + /// ┌─────────────────────┐ + /// │ │ + /// │ HashJoinExec │ + /// │ [u.dept@hash(d.id)] │ + /// │ │ + /// └─────────────────────┘ + /// │ + /// ┌────────────┴────────────┐ + /// ┌──────────▼──────────┐ ┌──────────▼──────────┐ + /// │ DataSourceExec │ │ DataSourceExec │ + /// │ alias [users as u] │ │ alias [dept as d] │ + /// │ filters = │ │ filters = │ + /// │ [depg@hash(d.id)] │ │ [ d.size > 100] │ + /// └─────────────────────┘ └─────────────────────┘ + /// ``` + /// + /// You may notice in this case that the filter is *dynamic*: the hash table is built + /// _after_ the `departments` table is read and at runtime. + /// We don't have a concrete `InList` filter or similar to push down at optimization time. + /// These sorts of dynamic filters are handled by building a specialized + /// [`PhysicalExpr`][datafusion_physical_expr::PhysicalExpr] that can be evaluated at runtime + /// and internally maintains a reference to the hash table or other state. + /// To make working with these sorts of dynamic filters more tractable we have the method `PhysicalExpr::snapshot` + /// (TODO: add reference after <https://github.com/apache/datafusion/pull/15568> is merged) + /// which attempts to simplify a dynamic filter into a "basic" non-dynamic filter. + /// For a join this could mean converting it to an `InList` filter or a min/max filter for example. + /// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details. + /// + /// Another form of dyanmic filter is pushing down the state of a `TopK` operator for queries like + /// `SELECT * FROM t ORDER BY id LIMIT 10`: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ TopK │ + /// │ limit = 10 │ + /// │ order by = [id] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// └──────────────────────┘ + /// ``` + /// + /// We can avoid large amounts of data processing by transforming this into: + /// + /// ```text + /// ┌──────────────────────┐ + /// │ TopK │ + /// │ limit = 10 │ + /// │ order by = [id] │ + /// └──────────────────────┘ + /// │ + /// ▼ + /// ┌──────────────────────┐ + /// │ DataSourceExec │ + /// │ projection = * │ + /// │ filters = │ + /// │ [id < @ TopKHeap] │ + /// └──────────────────────┘ + /// ``` + /// + /// Now as we fill our `TopK` heap we can push down the state of the heap to the `DataSourceExec` node + /// to avoid reading files / row groups / pages / rows that could not possibly be in the top 10. + /// This is implemented in datafusion/physical-plan/src/sorts/sort_filters.rs. + fn try_pushdown_filters( + &self, + plan: &Arc<dyn ExecutionPlan>, + parent_filters: &[PhysicalExprRef], + ) -> Result<ExecutionPlanFilterPushdownResult> { Review Comment: Now this is the only added method to `ExecutionPlan`. There's a bit of weirdness with having to have `&self` _and_ `plan: Arc<dyn ExecutionPlan>` which are the same thing just one is arc'ed and the other isn't. `Arc<Self>` won't cut it because you can't call methods on it and for this to work I need a default implementation in `ExecutionPlan` that does the recursion (otherwise we force ourselves to immediately implement this method on all nodes otherwise we would not recurse into subtrees, see the test with `AggregateExec`). If you have a way to avoid this happy to do it but it also doesn't seem like a huge deal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org