adriangb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2029910723
########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -467,8 +467,106 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Result<Option<Arc<dyn ExecutionPlan>>> { Ok(None) } + + /// Returns a set of filters that this operator owns but would like to be pushed down. + /// For example, a `TopK` operator may produce dynamic filters that reference it's currrent state, + /// while a `FilterExec` will just hand of the filters it has as is. + /// The default implementation returns an empty vector. + /// These filters are applied row-by row and any that return `false` or `NULL` will be + /// filtered out and any that return `true` will be kept. + /// The expressions returned **must** always return `true` or `false`; + /// other truthy or falsy values are not allowed (e.g. `0`, `1`). + /// + /// # Returns + /// A vector of filters that this operator would like to push down. + /// These should be treated as the split conjunction of a `WHERE` clause. + /// That is, a query such as `WHERE a = 1 AND b = 2` would return two + /// filters: `a = 1` and `b = 2`. + /// They can always be assembled into a single filter using + /// [`split_conjunction`][datafusion_physical_expr::split_conjunction]. + fn filters_for_pushdown(&self) -> Result<Vec<Arc<dyn PhysicalExpr>>> { + Ok(Vec::new()) + } + + /// Checks which filters this node allows to be pushed down through it from a parent to a child. + /// For example, a `ProjectionExec` node can allow filters that only refernece + /// columns it did not create through but filters that reference columns it is creating cannot be pushed down any further. + /// That is, it only allows some filters through because it changes the schema of the data. + /// Aggregation nodes may not allow any filters to be pushed down as they change the cardinality of the data. + /// RepartitionExec nodes allow all filters to be pushed down as they don't change the schema or cardinality. + fn filter_pushdown_request( Review Comment: I'm realizing there's another layer of complexity we need to add: operators like joins will want to partition the filters it gets from its parents into different buckets for each child: ```sql CREATE TABLE t1 (id int, col1 text); CREATE TABLE t2 (id int, col2 text); EXPLAIN VERBOSE SELECT * FROM t1 LEFT JOIN t2 ON t1.id = t2.id ORDER BY t1.col1 LIMIT 5; ``` This currently produces the plan: ``` | physical_plan | SortExec: TopK(fetch=5), expr=[col1@1 ASC NULLS LAST], preserve_partitioning=[false] | | | CoalesceBatchesExec: target_batch_size=8192 | | | HashJoinExec: mode=Partitioned, join_type=Left, on=[(id@0, id@0)] | | | MemoryExec: partitions=1, partition_sizes=[0] | | | MemoryExec: partitions=1, partition_sizes=[0] | | | | ``` Once we have the dynamic TopK filter pushdown we'd want to be able to push down the dynamic filter into the left side of the join (t1). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org