adriangb commented on PR #15301: URL: https://github.com/apache/datafusion/pull/15301#issuecomment-2760193801
> I suppose we could make a physical filter pushdown plan starting with the TopK To be clear my point is: if instead of `ParquetSource` I have [LiquidCacheClientExec](https://github.com/XiangpengHao/liquid-cache/blob/933990ae36439585a45c497cede37a608ecac51f/src/client/src/client_exec.rs#L73), will the physical optimizer rule still know what to do? I think we could make it work as long as you implement something via `DataSourceExec` and we leave the methods on the `FileOpener` trait. But that won't work if someone wants to replace `DataSourceExec` altogether. I think the only way to make arbitrary custom `ExecutionPlan` nodes opt into filter pushdown is going to be to have that be part of the `ExecutionPlan` trait itself. But maybe I just an not that familiar with the physical optimizers. The main pros for a physical optimizer rule that I can think of are: 1. More in line with existing _optimizations_. 1. Avoids introducing a new trait and new methods on `ExecutionPlan`. The issues with a custom `PhysicalExpr` in my mind are: 1. Things that serialize a `PhysicalExpr` across the wire, e.g. https://github.com/XiangpengHao/liquid-cache does it via [serialize_physical_expr](https://github.com/apache/datafusion/blob/46f4024577f4e13b79d8739849ecf51af5ddc7c2/datafusion/proto/src/physical_plan/mod.rs#L1720-L1723). 1. Things hardcode against certain `PhysicalExpr`, in particular `PruningPredicate` -> this would be solved if we nixed PruningPredicate in favor of https://github.com/apache/datafusion/issues/8078, but that may be a long ways away and unless we can solve this we'd give up a lot of performance in the short term. 1. Possible performance / lock contention -> this is probably solvable by balancing update frequency. The main pro in my mind is that it can be updated "more frequently" than when a file is opened or when a record batch is processed. I take it the most useful place for this would be to push down filters from a TopK into a join? That does sound like a potential big win. Maybe we can balance this by doing something like: ```rust trait DynamicFilterSource { fn current_filter(&self) -> Arc<dyn PhysicalExpr>; } pub struct DynamicFilterPhysicalExpr { inner: Arc<dyn DynamicFiltersource>, } impl DynamicFilterPhysicalExpr { pub fn inner(&self) -> Arc<dyn PhysicalExpr> { conjunction(self.inner.current_filters()) } } impl PhysicalExpr for DynamicFilterPhysicalExpr { // call inner to get the current filter the evaluate } ``` Now we can have `TopK` have it's own implementation of `DynamicFilterSource` and package it up as a `PhysicalExpr`. Joins and such can always use the "latest" dynamic filters. Things that need to serialize it across the wire or can't deal with dynamic filters can "snapshot" it by calling `inner()`. We'll have to add it to the downcast matching of `PhysicalExpr` in several places but that sounds tractable. We still introduce a `DynamicFilterSource` trait but it's less pervasive in the sense that it it's an internal implementation detail of `DynamicFilterPhysicalExpr` and not referenced by `ExecutionPlan`. I would still keep the methods on `ExecutionPlan` to do the pushdown instead of the optimizer rule unless I'm wrong about optimizer rules not being able to deal with `LiquidCacheClientExec` and such. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org