alamb commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2150940087


##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -509,8 +510,22 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     ///
     /// The default implementation bars all parent filters from being pushed 
down and adds no new filters.
     /// This is the safest option, making filter pushdown opt-in on a per-node 
pasis.
+    ///
+    /// Since this may perform deep modifications to the plan tree it is 
called early in the optimization phase
+    /// and is not expected to be called multiple times on the same plan.
+    ///
+    /// A quick summary of the phases is below, see [`FilterPushdownPhase`] 
for more details:
+    /// - [`FilterPushdownPhase::Pre`]: Filters get pushded down before most 
other optimizations are applied.
+    ///   At this stage the plan can be modified (e.g. when 
[`ExecutionPlan::handle_child_pushdown_result`] is called the plan may choose 
to return an entirely new plan tree)
+    ///   but subsequent optimizations may also rewrite the plan tree 
drastically, thus it is *not guaranteed* that a [`PhysicalExpr`] can hold on to 
a reference to the plan tree.
+    ///   During this phase static filters (such as `col = 1`) are pushed down.
+    /// - [`FilterPushdownPhase::Post`]: Filters get pushed down after most 
other optimizations are applied.
+    ///   At this stage the plan tree is expected to be stable and not change 
drastically, and operators that do filter pushdown during this phase should 
also not change the plan tree.

Review Comment:
   I think the requirement is that the plan nodes don't change. Given that 
DynamicFilters effectively can have pointers to existing ExecutionPlan 
instances if a pass changes / removes / rewrites an `ExecutionPlan` that added 
a DynamicFilter I am not sure what will happen 🤔 



##########
datafusion/common/src/config.rs:
##########
@@ -614,6 +614,13 @@ config_namespace! {
         /// during aggregations, if possible
         pub enable_topk_aggregation: bool, default = true
 
+        /// When set to true attempts to push down dynamic filters generated 
by operators into the file scan phase.

Review Comment:
   Would the idea be to prune hash table state, for example, if we knew some of 
the groups were no longer needed? 
   
   I do think implementing more "late materialization" (aka turn on 
filter_pushdown) will help too



##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -548,10 +563,22 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     ///   This can be used alongside 
[`FilterPushdownPropagation::with_filters`] and 
[`FilterPushdownPropagation::with_updated_node`]
     ///   to dynamically build a result with a mix of supported and 
unsupported filters.
     ///
+    /// There are two different phases in filter pushdown, which some 
operators may handle the same and some differently.

Review Comment:
   While I love documentation, I would personally suggest not duplicating the 
docs here as duplicates can get out of sync, and instead leave a link to 
`FilterPushdownPhase` and focus on getting that documentation to be as clear as 
possible



##########
datafusion/physical-plan/src/filter_pushdown.rs:
##########
@@ -20,6 +20,39 @@ use std::vec::IntoIter;
 
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 
+#[derive(Debug, Clone, Copy)]
+pub enum FilterPushdownPhase {
+    /// Pushdown that happens before most other optimizations.
+    /// This pushdown allows static filters that do not reference any 
[`ExecutionPlan`]s to be pushed down.
+    /// Filters that reference an [`ExecutionPlan`] cannot be pushed down at 
this stage since the whole plan tree may be rewritten
+    /// by other optimizations.
+    /// Implemneters are however allowed to modify the execution plan 
themselves during this phase, for example by returning a completely
+    /// different [`ExecutionPlan`] from 
[`ExecutionPlan::handle_child_pushdown_result`].
+    ///
+    /// [`ExecutionPlan`]: crate::ExecutionPlan
+    /// [`ExecutionPlan::handle_child_pushdown_result`]: 
crate::ExecutionPlan::handle_child_pushdown_result
+    Pre,
+    /// Pushdown that happens after most other optimizations.
+    /// This pushdown allows filters that reference an [`ExecutionPlan`] to be 
pushed down.
+    /// It is guaranteed that subsequent optimizations will not make large 
changes to the plan tree,

Review Comment:
   Aa above, I think it woudl be good to make it more precise what "large 
changes to the plan tree" means (basically I think it means don't remove 
existing ExecutionPlans ? 🤔 )



##########
datafusion/physical-optimizer/src/optimizer.rs:
##########
@@ -131,6 +131,8 @@ impl PhysicalOptimizer {
             // replacing operators with fetching variants, or adding limits
             // past operators that support limit pushdown.
             Arc::new(LimitPushdown::new()),
+            // This FilterPushdown handles dynamic filters that may have 
references to the source ExecutionPlan
+            Arc::new(FilterPushdown::new_post_optimization()),

Review Comment:
   this is so much nicer than adding it to `EnforceSorting` ❤️ 
   
   But shouldn't this be the final pass? (maybe right before SanityCheckPlan?)
   
   As I understand it, this filter pushdown pass has to be run after any pass 
that modifies the structure of the plan and `ProjectionPushdown` may actually 
do that 🤔 
   
   It also think it would be good to add a comment here explaining that 
`FilterPushdown::new_post_optimization()` must be run after all passes that 
change the structure of the plan as it can generate pointers from one plan to 
another



##########
datafusion/physical-optimizer/src/filter_pushdown.rs:
##########
@@ -362,17 +363,25 @@ use itertools::izip;
 /// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec
 /// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec
 #[derive(Debug)]
-pub struct FilterPushdown {}
+pub struct FilterPushdown {
+    phase: FilterPushdownPhase,
+    name: String,
+}
 
 impl FilterPushdown {
-    pub fn new() -> Self {
-        Self {}
+    fn new(phase: FilterPushdownPhase) -> Self {
+        Self {
+            phase,
+            name: format!("FilterPushdown({phase})"),

Review Comment:
   I like `FilterPushdown` and `FilterPushdown(Dynamic)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to