alamb commented on code in PR #15301:
URL: https://github.com/apache/datafusion/pull/15301#discussion_r2021619914


##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -349,11 +337,13 @@ impl ParquetSource {
     }
 
     /// Optional reference to this parquet scan's pruning predicate
+    #[deprecated(note = "ParquetDataSource no longer constructs a 
PruningPredicate.")]
     pub fn pruning_predicate(&self) -> Option<&Arc<PruningPredicate>> {
         self.pruning_predicate.as_ref()
     }
 
     /// Optional reference to this parquet scan's page pruning predicate
+    #[deprecated(note = "ParquetDataSource no longer constructs a 
PruningPredicate.")]

Review Comment:
   ```suggestion
       #[deprecated(note = "ParquetSource no longer constructs a 
PruningPredicate.")]
   ```



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -349,11 +337,13 @@ impl ParquetSource {
     }
 
     /// Optional reference to this parquet scan's pruning predicate
+    #[deprecated(note = "ParquetDataSource no longer constructs a 
PruningPredicate.")]

Review Comment:
   ```suggestion
       #[deprecated(note = "ParqueSource no longer constructs a 
PruningPredicate.")]
   ```



##########
datafusion/physical-plan/src/dynamic_filters.rs:
##########
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    any::Any,
+    hash::Hash,
+    sync::{Arc, RwLock},
+};
+
+use datafusion_common::{
+    tree_node::{Transformed, TransformedResult, TreeNode},
+    Result,
+};
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_expr::{expressions::lit, utils::conjunction, 
PhysicalExpr};
+
+/// A source of dynamic runtime filters.
+///
+/// During query execution, operators implementing this trait can provide
+/// filter expressions that other operators can use to dynamically prune data.
+///
+/// See `TopKDynamicFilterSource` in datafusion/physical-plan/src/topk/mod.rs 
for examples.
+pub trait DynamicFilterSource: Send + Sync + std::fmt::Debug + 'static {
+    /// Take a snapshot of the current state of filtering, returning a 
non-dynamic PhysicalExpr.
+    /// This is used to e.g. serialize dynamic filters across the wire or to 
pass them into systems
+    /// that won't use the `PhysicalExpr` API (e.g. matching on the concrete 
types of the expressions like `PruningPredicate` does).
+    /// For example, it is expected that this returns a relatively simple 
expression such as `col1 > 5` for a TopK operator or
+    /// `col2 IN (1, 2, ... N)` for a HashJoin operator.
+    fn snapshot_current_filters(&self) -> Result<Vec<Arc<dyn PhysicalExpr>>>;
+}
+
+#[derive(Debug)]
+pub struct DynamicFilterPhysicalExpr {

Review Comment:
   Could you add in some small context about what a `DynamicFilterPhysicalExpr` 
is?



##########
datafusion/physical-plan/src/filter.rs:
##########
@@ -433,6 +433,22 @@ impl ExecutionPlan for FilterExec {
         }
         try_embed_projection(projection, self)
     }
+
+    fn push_down_filter(
+        &self,
+        expr: Arc<dyn PhysicalExpr>,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        let mut input = Arc::clone(&self.input);
+        if let Some(new_input) = input.push_down_filter(Arc::clone(&expr))? {

Review Comment:
   if the filter was able to be pushed down into the filters input, it seems 
like it would be more efficient to not *ALSO* evaluate it in the `FilterExec` 
itself



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1224,6 +1245,28 @@ impl ExecutionPlan for SortExec {
                 .with_preserve_partitioning(self.preserve_partitioning()),
         )))
     }
+
+    // Pass though filter pushdown.
+    // This often happens in partitioned plans with a TopK because we end up 
with 1 TopK per partition + a final TopK at the end.
+    // Implementing this pass-through allows global/top/final TopK to push 
down filters to the partitions.
+    fn push_down_filter(
+        &self,
+        expr: Arc<dyn PhysicalExpr>,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {

Review Comment:
   I agree it doesn't need to be done in the first PR -- but we should probably 
be tracking as follow on work



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -349,11 +337,13 @@ impl ParquetSource {
     }
 
     /// Optional reference to this parquet scan's pruning predicate
+    #[deprecated(note = "ParquetDataSource no longer constructs a 
PruningPredicate.")]
     pub fn pruning_predicate(&self) -> Option<&Arc<PruningPredicate>> {
         self.pruning_predicate.as_ref()

Review Comment:
   I suggest we change this code to always return None (and remove the page 
pruning predicate from the source).
   
   Otherwise I predict that we'll end up with this field slowly bitrotting (aka 
stop working but not covered by tests)
   
   ```suggestion
           None
   ```
   
   I sugget the same thing for page_pruning_predicate



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -349,11 +337,13 @@ impl ParquetSource {
     }
 
     /// Optional reference to this parquet scan's pruning predicate
+    #[deprecated(note = "ParquetDataSource no longer constructs a 
PruningPredicate.")]
     pub fn pruning_predicate(&self) -> Option<&Arc<PruningPredicate>> {
         self.pruning_predicate.as_ref()
     }
 
     /// Optional reference to this parquet scan's page pruning predicate
+    #[deprecated(note = "ParquetDataSource no longer constructs a 
PruningPredicate.")]
     pub fn page_pruning_predicate(&self) -> 
Option<&Arc<PagePruningAccessPlanFilter>> {
         self.page_pruning_predicate.as_ref()

Review Comment:
   ```suggestion
           None
   ```



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -537,11 +525,10 @@ impl FileSource for ParquetSource {
             .expect("projected_statistics must be set");
         // When filters are pushed down, we have no way of knowing the exact 
statistics.
         // Note that pruning predicate is also a kind of filter pushdown.
-        // (bloom filters use `pruning_predicate` too)
-        if self.pruning_predicate().is_some()
-            || self.page_pruning_predicate().is_some()
-            || (self.predicate().is_some() && self.pushdown_filters())
-        {
+        // (bloom filters use `pruning_predicate` too).
+        // Because filter pushdown may happen dynamically as long as there is 
a predicate

Review Comment:
   👍 



##########
datafusion/datasource/src/source.rs:
##########
@@ -79,6 +79,13 @@ pub trait DataSource: Send + Sync + Debug {
         &self,
         _projection: &ProjectionExec,
     ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>>;
+
+    fn push_down_filter(

Review Comment:
   Can you please document what the expectations for this function are? 
Specifically, I think it would be great to answer the following questions:
   
   1. If this returns `Some(..)` does that *guarantee* that any rows that don't 
pass the filter are filtered out? Or is it just a best effort?
   2. Should the plan recursively push the filter into its input (I think the 
answer is yes)



##########
datafusion/physical-expr-common/src/physical_expr.rs:
##########
@@ -283,6 +284,47 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + 
DynEq + DynHash {
     /// See the [`fmt_sql`] function for an example of printing 
`PhysicalExpr`s as SQL.
     ///
     fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result;
+
+    /// Take a snapshot of this `PhysicalExpr` if it is dynamic.
+    /// This is used to capture the current state of `PhysicalExpr`s that may 
contain
+    /// dynamic references to other operators in order to serialize it over 
the wire
+    /// or treat it via downcast matching.
+    ///
+    /// You should not call this method directly as it does not handle 
recursion.
+    /// Instead use `shapshot_physical_expr` to handle recursion and capture 
the

Review Comment:
   I think if you do it like this:
   
   ```suggestion
       /// Instead use [`shapshot_physical_expr`] to handle recursion and 
capture the
   ```
   
   rustdoc will add a link automatically for you



##########
datafusion/physical-expr/src/utils/mod.rs:
##########
@@ -47,6 +47,22 @@ pub fn split_conjunction(
     split_impl(Operator::And, predicate, vec![])
 }
 
+/// Create a conjunction of the given predicates.
+/// If the input is empty, return a literal true.

Review Comment:
   😍 



##########
datafusion/physical-plan/src/dynamic_filters.rs:
##########
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    any::Any,
+    hash::Hash,
+    sync::{Arc, RwLock},
+};
+
+use datafusion_common::{
+    tree_node::{Transformed, TransformedResult, TreeNode},
+    Result,
+};
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_expr::{expressions::lit, utils::conjunction, 
PhysicalExpr};
+
+/// A source of dynamic runtime filters.
+///
+/// During query execution, operators implementing this trait can provide
+/// filter expressions that other operators can use to dynamically prune data.
+///
+/// See `TopKDynamicFilterSource` in datafusion/physical-plan/src/topk/mod.rs 
for examples.
+pub trait DynamicFilterSource: Send + Sync + std::fmt::Debug + 'static {
+    /// Take a snapshot of the current state of filtering, returning a 
non-dynamic PhysicalExpr.
+    /// This is used to e.g. serialize dynamic filters across the wire or to 
pass them into systems
+    /// that won't use the `PhysicalExpr` API (e.g. matching on the concrete 
types of the expressions like `PruningPredicate` does).
+    /// For example, it is expected that this returns a relatively simple 
expression such as `col1 > 5` for a TopK operator or
+    /// `col2 IN (1, 2, ... N)` for a HashJoin operator.
+    fn snapshot_current_filters(&self) -> Result<Vec<Arc<dyn PhysicalExpr>>>;
+}
+
+#[derive(Debug)]
+pub struct DynamicFilterPhysicalExpr {
+    /// The children of this expression.
+    /// In particular, it is important that if the dynamic expression will 
reference any columns
+    /// those columns be marked as children of this expression so that the 
expression can be properly
+    /// bound to the schema.
+    children: Vec<Arc<dyn PhysicalExpr>>,
+    /// Remapped children, if `PhysicalExpr::with_new_children` was called.
+    /// This is used to ensure that the children of the expression are always 
the same
+    /// as the children of the dynamic filter source.
+    remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
+    /// The source of dynamic filters.
+    inner: Arc<dyn DynamicFilterSource>,
+    /// For testing purposes track the data type and nullability to make sure 
they don't change.
+    /// If they do, there's a bug in the implementation.
+    /// But this can have overhead in production, so it's only included in 
tests.
+    data_type: Arc<RwLock<Option<arrow::datatypes::DataType>>>,
+    nullable: Arc<RwLock<Option<bool>>>,
+}
+
+impl std::fmt::Display for DynamicFilterPhysicalExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "DynamicFilterPhysicalExpr")
+    }
+}
+
+// Manually derive PartialEq and Hash to work around 
https://github.com/rust-lang/rust/issues/78808
+impl PartialEq for DynamicFilterPhysicalExpr {
+    fn eq(&self, other: &Self) -> bool {
+        self.current().eq(&other.current())
+    }
+}
+
+impl Eq for DynamicFilterPhysicalExpr {}
+
+impl Hash for DynamicFilterPhysicalExpr {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.current().hash(state)
+    }
+}
+
+impl DynamicFilterPhysicalExpr {
+    pub fn new(
+        children: Vec<Arc<dyn PhysicalExpr>>,
+        inner: Arc<dyn DynamicFilterSource>,
+    ) -> Self {
+        Self {
+            children,
+            remapped_children: None,
+            inner,
+            data_type: Arc::new(RwLock::new(None)),
+            nullable: Arc::new(RwLock::new(None)),
+        }
+    }
+
+    fn current(&self) -> Arc<dyn PhysicalExpr> {
+        let current = if let Ok(current) = 
self.inner.snapshot_current_filters() {

Review Comment:
   I think it is important to describe here in comments why it is `AND`ing the 
filter together
   
   Specifically, I think the reason is when there are multiple TopK nodes, each 
contributes a potentially different range as they have different topk heaps. 



##########
datafusion/physical-optimizer/src/pruning.rs:
##########
@@ -527,6 +529,7 @@ impl PruningPredicate {
     /// See the struct level documentation on [`PruningPredicate`] for more
     /// details.
     pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> 
Result<Self> {
+        let expr = snasphot_physical_expr(expr)?;

Review Comment:
   This is good context to add as a comment Ithink 
   
   ```suggestion
           // Get a (simpler) snapshot of the physical expr here to use with 
`PruningPredicate`
           // which does not handle dynamic exprs  in general
           let expr = snasphot_physical_expr(expr)?;
   ```



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1197,35 +1197,55 @@ impl ExecutionPlan for SortExec {
     ) -> Result<SendableRecordBatchStream> {
         trace!("Start SortExec::execute for partition {} of context session_id 
{} and task_id {:?}", partition, context.session_id(), context.task_id());
 
-        let mut input = self.input.execute(partition, Arc::clone(&context))?;
-
-        let execution_options = &context.session_config().options().execution;
-
-        trace!("End SortExec's input.execute for partition: {}", partition);
-
         let sort_satisfied = self
             .input
             .equivalence_properties()
             
.ordering_satisfy_requirement(&LexRequirement::from(self.expr.clone()));
 
+        let input_exec = Arc::clone(&self.input);
+
+        let execution_options = &context.session_config().options().execution;
+
+        trace!("End SortExec's input.execute for partition: {}", partition);
+
         match (sort_satisfied, self.fetch.as_ref()) {
-            (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
-                input,
-                0,
-                Some(*fetch),
-                BaselineMetrics::new(&self.metrics_set, partition),
-            ))),
-            (true, None) => Ok(input),
+            (true, Some(fetch)) => {
+                let input = input_exec.execute(partition, 
Arc::clone(&context))?;
+                Ok(Box::pin(LimitStream::new(
+                    input,
+                    0,
+                    Some(*fetch),
+                    BaselineMetrics::new(&self.metrics_set, partition),
+                )))
+            }
+            (true, None) => self.input.execute(partition, 
Arc::clone(&context)),
             (false, Some(fetch)) => {
+                let schema = input_exec.schema();
                 let mut topk = TopK::try_new(
                     partition,
-                    input.schema(),
+                    schema,
                     self.expr.clone(),
                     *fetch,
                     context.session_config().batch_size(),
                     context.runtime_env(),
                     &self.metrics_set,
                 )?;
+                let input_exec = if context
+                    .session_config()
+                    .options()
+                    .optimizer
+                    .enable_dynamic_filter_pushdown
+                {
+                    // Try to push down the dynamic filter. If the execution 
plan doesn't
+                    // support it, push_down_filter will return None and we'll
+                    // keep the original input_exec.
+                    input_exec

Review Comment:
   I still find it very strange that this pushdown happens *after* `execute` 
gets called -- would you be open to a PR that tries to do this with an 
optimizer pass? 
   
   Apart from keeping the optimization and execution separate, which I think 
will help overall understandability, doing the pushdown as an optimizer pass 
will make the filters visible on explain plans (and thus easier to verify it 
was happening as expected)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to