alamb commented on code in PR #15770: URL: https://github.com/apache/datafusion/pull/15770#discussion_r2150940087
########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -509,8 +510,22 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// /// The default implementation bars all parent filters from being pushed down and adds no new filters. /// This is the safest option, making filter pushdown opt-in on a per-node pasis. + /// + /// Since this may perform deep modifications to the plan tree it is called early in the optimization phase + /// and is not expected to be called multiple times on the same plan. + /// + /// A quick summary of the phases is below, see [`FilterPushdownPhase`] for more details: + /// - [`FilterPushdownPhase::Pre`]: Filters get pushded down before most other optimizations are applied. + /// At this stage the plan can be modified (e.g. when [`ExecutionPlan::handle_child_pushdown_result`] is called the plan may choose to return an entirely new plan tree) + /// but subsequent optimizations may also rewrite the plan tree drastically, thus it is *not guaranteed* that a [`PhysicalExpr`] can hold on to a reference to the plan tree. + /// During this phase static filters (such as `col = 1`) are pushed down. + /// - [`FilterPushdownPhase::Post`]: Filters get pushed down after most other optimizations are applied. + /// At this stage the plan tree is expected to be stable and not change drastically, and operators that do filter pushdown during this phase should also not change the plan tree. Review Comment: I think the requirement is that the plan nodes don't change. Given that DynamicFilters effectively can have pointers to existing ExecutionPlan instances if a pass changes / removes / rewrites an `ExecutionPlan` that added a DynamicFilter I am not sure what will happen 🤔 ########## datafusion/common/src/config.rs: ########## @@ -614,6 +614,13 @@ config_namespace! { /// during aggregations, if possible pub enable_topk_aggregation: bool, default = true + /// When set to true attempts to push down dynamic filters generated by operators into the file scan phase. Review Comment: Would the idea be to prune hash table state, for example, if we knew some of the groups were no longer needed? I do think implementing more "late materialization" (aka turn on filter_pushdown) will help too ########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -548,10 +563,22 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// This can be used alongside [`FilterPushdownPropagation::with_filters`] and [`FilterPushdownPropagation::with_updated_node`] /// to dynamically build a result with a mix of supported and unsupported filters. /// + /// There are two different phases in filter pushdown, which some operators may handle the same and some differently. Review Comment: While I love documentation, I would personally suggest not duplicating the docs here as duplicates can get out of sync, and instead leave a link to `FilterPushdownPhase` and focus on getting that documentation to be as clear as possible ########## datafusion/physical-plan/src/filter_pushdown.rs: ########## @@ -20,6 +20,39 @@ use std::vec::IntoIter; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +#[derive(Debug, Clone, Copy)] +pub enum FilterPushdownPhase { + /// Pushdown that happens before most other optimizations. + /// This pushdown allows static filters that do not reference any [`ExecutionPlan`]s to be pushed down. + /// Filters that reference an [`ExecutionPlan`] cannot be pushed down at this stage since the whole plan tree may be rewritten + /// by other optimizations. + /// Implemneters are however allowed to modify the execution plan themselves during this phase, for example by returning a completely + /// different [`ExecutionPlan`] from [`ExecutionPlan::handle_child_pushdown_result`]. + /// + /// [`ExecutionPlan`]: crate::ExecutionPlan + /// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result + Pre, + /// Pushdown that happens after most other optimizations. + /// This pushdown allows filters that reference an [`ExecutionPlan`] to be pushed down. + /// It is guaranteed that subsequent optimizations will not make large changes to the plan tree, Review Comment: Aa above, I think it woudl be good to make it more precise what "large changes to the plan tree" means (basically I think it means don't remove existing ExecutionPlans ? 🤔 ) ########## datafusion/physical-optimizer/src/optimizer.rs: ########## @@ -131,6 +131,8 @@ impl PhysicalOptimizer { // replacing operators with fetching variants, or adding limits // past operators that support limit pushdown. Arc::new(LimitPushdown::new()), + // This FilterPushdown handles dynamic filters that may have references to the source ExecutionPlan + Arc::new(FilterPushdown::new_post_optimization()), Review Comment: this is so much nicer than adding it to `EnforceSorting` ❤️ But shouldn't this be the final pass? (maybe right before SanityCheckPlan?) As I understand it, this filter pushdown pass has to be run after any pass that modifies the structure of the plan and `ProjectionPushdown` may actually do that 🤔 It also think it would be good to add a comment here explaining that `FilterPushdown::new_post_optimization()` must be run after all passes that change the structure of the plan as it can generate pointers from one plan to another ########## datafusion/physical-optimizer/src/filter_pushdown.rs: ########## @@ -362,17 +363,25 @@ use itertools::izip; /// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec /// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec #[derive(Debug)] -pub struct FilterPushdown {} +pub struct FilterPushdown { + phase: FilterPushdownPhase, + name: String, +} impl FilterPushdown { - pub fn new() -> Self { - Self {} + fn new(phase: FilterPushdownPhase) -> Self { + Self { + phase, + name: format!("FilterPushdown({phase})"), Review Comment: I like `FilterPushdown` and `FilterPushdown(Dynamic)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org