alamb commented on code in PR #15301: URL: https://github.com/apache/datafusion/pull/15301#discussion_r2021619914
########## datafusion/datasource-parquet/src/source.rs: ########## @@ -349,11 +337,13 @@ impl ParquetSource { } /// Optional reference to this parquet scan's pruning predicate + #[deprecated(note = "ParquetDataSource no longer constructs a PruningPredicate.")] pub fn pruning_predicate(&self) -> Option<&Arc<PruningPredicate>> { self.pruning_predicate.as_ref() } /// Optional reference to this parquet scan's page pruning predicate + #[deprecated(note = "ParquetDataSource no longer constructs a PruningPredicate.")] Review Comment: ```suggestion #[deprecated(note = "ParquetSource no longer constructs a PruningPredicate.")] ``` ########## datafusion/datasource-parquet/src/source.rs: ########## @@ -349,11 +337,13 @@ impl ParquetSource { } /// Optional reference to this parquet scan's pruning predicate + #[deprecated(note = "ParquetDataSource no longer constructs a PruningPredicate.")] Review Comment: ```suggestion #[deprecated(note = "ParqueSource no longer constructs a PruningPredicate.")] ``` ########## datafusion/physical-plan/src/dynamic_filters.rs: ########## @@ -0,0 +1,320 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + any::Any, + hash::Hash, + sync::{Arc, RwLock}, +}; + +use datafusion_common::{ + tree_node::{Transformed, TransformedResult, TreeNode}, + Result, +}; +use datafusion_expr::ColumnarValue; +use datafusion_physical_expr::{expressions::lit, utils::conjunction, PhysicalExpr}; + +/// A source of dynamic runtime filters. +/// +/// During query execution, operators implementing this trait can provide +/// filter expressions that other operators can use to dynamically prune data. +/// +/// See `TopKDynamicFilterSource` in datafusion/physical-plan/src/topk/mod.rs for examples. +pub trait DynamicFilterSource: Send + Sync + std::fmt::Debug + 'static { + /// Take a snapshot of the current state of filtering, returning a non-dynamic PhysicalExpr. + /// This is used to e.g. serialize dynamic filters across the wire or to pass them into systems + /// that won't use the `PhysicalExpr` API (e.g. matching on the concrete types of the expressions like `PruningPredicate` does). + /// For example, it is expected that this returns a relatively simple expression such as `col1 > 5` for a TopK operator or + /// `col2 IN (1, 2, ... N)` for a HashJoin operator. + fn snapshot_current_filters(&self) -> Result<Vec<Arc<dyn PhysicalExpr>>>; +} + +#[derive(Debug)] +pub struct DynamicFilterPhysicalExpr { Review Comment: Could you add in some small context about what a `DynamicFilterPhysicalExpr` is? ########## datafusion/physical-plan/src/filter.rs: ########## @@ -433,6 +433,22 @@ impl ExecutionPlan for FilterExec { } try_embed_projection(projection, self) } + + fn push_down_filter( + &self, + expr: Arc<dyn PhysicalExpr>, + ) -> Result<Option<Arc<dyn ExecutionPlan>>> { + let mut input = Arc::clone(&self.input); + if let Some(new_input) = input.push_down_filter(Arc::clone(&expr))? { Review Comment: if the filter was able to be pushed down into the filters input, it seems like it would be more efficient to not *ALSO* evaluate it in the `FilterExec` itself ########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -1224,6 +1245,28 @@ impl ExecutionPlan for SortExec { .with_preserve_partitioning(self.preserve_partitioning()), ))) } + + // Pass though filter pushdown. + // This often happens in partitioned plans with a TopK because we end up with 1 TopK per partition + a final TopK at the end. + // Implementing this pass-through allows global/top/final TopK to push down filters to the partitions. + fn push_down_filter( + &self, + expr: Arc<dyn PhysicalExpr>, + ) -> Result<Option<Arc<dyn ExecutionPlan>>> { Review Comment: I agree it doesn't need to be done in the first PR -- but we should probably be tracking as follow on work ########## datafusion/datasource-parquet/src/source.rs: ########## @@ -349,11 +337,13 @@ impl ParquetSource { } /// Optional reference to this parquet scan's pruning predicate + #[deprecated(note = "ParquetDataSource no longer constructs a PruningPredicate.")] pub fn pruning_predicate(&self) -> Option<&Arc<PruningPredicate>> { self.pruning_predicate.as_ref() Review Comment: I suggest we change this code to always return None (and remove the page pruning predicate from the source). Otherwise I predict that we'll end up with this field slowly bitrotting (aka stop working but not covered by tests) ```suggestion None ``` I sugget the same thing for page_pruning_predicate ########## datafusion/datasource-parquet/src/source.rs: ########## @@ -349,11 +337,13 @@ impl ParquetSource { } /// Optional reference to this parquet scan's pruning predicate + #[deprecated(note = "ParquetDataSource no longer constructs a PruningPredicate.")] pub fn pruning_predicate(&self) -> Option<&Arc<PruningPredicate>> { self.pruning_predicate.as_ref() } /// Optional reference to this parquet scan's page pruning predicate + #[deprecated(note = "ParquetDataSource no longer constructs a PruningPredicate.")] pub fn page_pruning_predicate(&self) -> Option<&Arc<PagePruningAccessPlanFilter>> { self.page_pruning_predicate.as_ref() Review Comment: ```suggestion None ``` ########## datafusion/datasource-parquet/src/source.rs: ########## @@ -537,11 +525,10 @@ impl FileSource for ParquetSource { .expect("projected_statistics must be set"); // When filters are pushed down, we have no way of knowing the exact statistics. // Note that pruning predicate is also a kind of filter pushdown. - // (bloom filters use `pruning_predicate` too) - if self.pruning_predicate().is_some() - || self.page_pruning_predicate().is_some() - || (self.predicate().is_some() && self.pushdown_filters()) - { + // (bloom filters use `pruning_predicate` too). + // Because filter pushdown may happen dynamically as long as there is a predicate Review Comment: 👍 ########## datafusion/datasource/src/source.rs: ########## @@ -79,6 +79,13 @@ pub trait DataSource: Send + Sync + Debug { &self, _projection: &ProjectionExec, ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>>; + + fn push_down_filter( Review Comment: Can you please document what the expectations for this function are? Specifically, I think it would be great to answer the following questions: 1. If this returns `Some(..)` does that *guarantee* that any rows that don't pass the filter are filtered out? Or is it just a best effort? 2. Should the plan recursively push the filter into its input (I think the answer is yes) ########## datafusion/physical-expr-common/src/physical_expr.rs: ########## @@ -283,6 +284,47 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { /// See the [`fmt_sql`] function for an example of printing `PhysicalExpr`s as SQL. /// fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result; + + /// Take a snapshot of this `PhysicalExpr` if it is dynamic. + /// This is used to capture the current state of `PhysicalExpr`s that may contain + /// dynamic references to other operators in order to serialize it over the wire + /// or treat it via downcast matching. + /// + /// You should not call this method directly as it does not handle recursion. + /// Instead use `shapshot_physical_expr` to handle recursion and capture the Review Comment: I think if you do it like this: ```suggestion /// Instead use [`shapshot_physical_expr`] to handle recursion and capture the ``` rustdoc will add a link automatically for you ########## datafusion/physical-expr/src/utils/mod.rs: ########## @@ -47,6 +47,22 @@ pub fn split_conjunction( split_impl(Operator::And, predicate, vec![]) } +/// Create a conjunction of the given predicates. +/// If the input is empty, return a literal true. Review Comment: 😍 ########## datafusion/physical-plan/src/dynamic_filters.rs: ########## @@ -0,0 +1,320 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + any::Any, + hash::Hash, + sync::{Arc, RwLock}, +}; + +use datafusion_common::{ + tree_node::{Transformed, TransformedResult, TreeNode}, + Result, +}; +use datafusion_expr::ColumnarValue; +use datafusion_physical_expr::{expressions::lit, utils::conjunction, PhysicalExpr}; + +/// A source of dynamic runtime filters. +/// +/// During query execution, operators implementing this trait can provide +/// filter expressions that other operators can use to dynamically prune data. +/// +/// See `TopKDynamicFilterSource` in datafusion/physical-plan/src/topk/mod.rs for examples. +pub trait DynamicFilterSource: Send + Sync + std::fmt::Debug + 'static { + /// Take a snapshot of the current state of filtering, returning a non-dynamic PhysicalExpr. + /// This is used to e.g. serialize dynamic filters across the wire or to pass them into systems + /// that won't use the `PhysicalExpr` API (e.g. matching on the concrete types of the expressions like `PruningPredicate` does). + /// For example, it is expected that this returns a relatively simple expression such as `col1 > 5` for a TopK operator or + /// `col2 IN (1, 2, ... N)` for a HashJoin operator. + fn snapshot_current_filters(&self) -> Result<Vec<Arc<dyn PhysicalExpr>>>; +} + +#[derive(Debug)] +pub struct DynamicFilterPhysicalExpr { + /// The children of this expression. + /// In particular, it is important that if the dynamic expression will reference any columns + /// those columns be marked as children of this expression so that the expression can be properly + /// bound to the schema. + children: Vec<Arc<dyn PhysicalExpr>>, + /// Remapped children, if `PhysicalExpr::with_new_children` was called. + /// This is used to ensure that the children of the expression are always the same + /// as the children of the dynamic filter source. + remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>, + /// The source of dynamic filters. + inner: Arc<dyn DynamicFilterSource>, + /// For testing purposes track the data type and nullability to make sure they don't change. + /// If they do, there's a bug in the implementation. + /// But this can have overhead in production, so it's only included in tests. + data_type: Arc<RwLock<Option<arrow::datatypes::DataType>>>, + nullable: Arc<RwLock<Option<bool>>>, +} + +impl std::fmt::Display for DynamicFilterPhysicalExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DynamicFilterPhysicalExpr") + } +} + +// Manually derive PartialEq and Hash to work around https://github.com/rust-lang/rust/issues/78808 +impl PartialEq for DynamicFilterPhysicalExpr { + fn eq(&self, other: &Self) -> bool { + self.current().eq(&other.current()) + } +} + +impl Eq for DynamicFilterPhysicalExpr {} + +impl Hash for DynamicFilterPhysicalExpr { + fn hash<H: std::hash::Hasher>(&self, state: &mut H) { + self.current().hash(state) + } +} + +impl DynamicFilterPhysicalExpr { + pub fn new( + children: Vec<Arc<dyn PhysicalExpr>>, + inner: Arc<dyn DynamicFilterSource>, + ) -> Self { + Self { + children, + remapped_children: None, + inner, + data_type: Arc::new(RwLock::new(None)), + nullable: Arc::new(RwLock::new(None)), + } + } + + fn current(&self) -> Arc<dyn PhysicalExpr> { + let current = if let Ok(current) = self.inner.snapshot_current_filters() { Review Comment: I think it is important to describe here in comments why it is `AND`ing the filter together Specifically, I think the reason is when there are multiple TopK nodes, each contributes a potentially different range as they have different topk heaps. ########## datafusion/physical-optimizer/src/pruning.rs: ########## @@ -527,6 +529,7 @@ impl PruningPredicate { /// See the struct level documentation on [`PruningPredicate`] for more /// details. pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> { + let expr = snasphot_physical_expr(expr)?; Review Comment: This is good context to add as a comment Ithink ```suggestion // Get a (simpler) snapshot of the physical expr here to use with `PruningPredicate` // which does not handle dynamic exprs in general let expr = snasphot_physical_expr(expr)?; ``` ########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -1197,35 +1197,55 @@ impl ExecutionPlan for SortExec { ) -> Result<SendableRecordBatchStream> { trace!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); - let mut input = self.input.execute(partition, Arc::clone(&context))?; - - let execution_options = &context.session_config().options().execution; - - trace!("End SortExec's input.execute for partition: {}", partition); - let sort_satisfied = self .input .equivalence_properties() .ordering_satisfy_requirement(&LexRequirement::from(self.expr.clone())); + let input_exec = Arc::clone(&self.input); + + let execution_options = &context.session_config().options().execution; + + trace!("End SortExec's input.execute for partition: {}", partition); + match (sort_satisfied, self.fetch.as_ref()) { - (true, Some(fetch)) => Ok(Box::pin(LimitStream::new( - input, - 0, - Some(*fetch), - BaselineMetrics::new(&self.metrics_set, partition), - ))), - (true, None) => Ok(input), + (true, Some(fetch)) => { + let input = input_exec.execute(partition, Arc::clone(&context))?; + Ok(Box::pin(LimitStream::new( + input, + 0, + Some(*fetch), + BaselineMetrics::new(&self.metrics_set, partition), + ))) + } + (true, None) => self.input.execute(partition, Arc::clone(&context)), (false, Some(fetch)) => { + let schema = input_exec.schema(); let mut topk = TopK::try_new( partition, - input.schema(), + schema, self.expr.clone(), *fetch, context.session_config().batch_size(), context.runtime_env(), &self.metrics_set, )?; + let input_exec = if context + .session_config() + .options() + .optimizer + .enable_dynamic_filter_pushdown + { + // Try to push down the dynamic filter. If the execution plan doesn't + // support it, push_down_filter will return None and we'll + // keep the original input_exec. + input_exec Review Comment: I still find it very strange that this pushdown happens *after* `execute` gets called -- would you be open to a PR that tries to do this with an optimizer pass? Apart from keeping the optimization and execution separate, which I think will help overall understandability, doing the pushdown as an optimizer pass will make the filters visible on explain plans (and thus easier to verify it was happening as expected) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org