This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7eb710d5a2 Introduce DynamicFilterSource and DynamicPhysicalExpr 
(#15568)
7eb710d5a2 is described below

commit 7eb710d5a2f768659e6ca49e71060e3642a88ea2
Author: Adrian Garcia Badaracco <1755071+adria...@users.noreply.github.com>
AuthorDate: Thu Apr 10 13:09:18 2025 -0500

    Introduce DynamicFilterSource and DynamicPhysicalExpr (#15568)
    
    * update
    
    * Add file
    
    * fix
    
    * Add remap children test
    
    * fmt
    
    * more comments
    
    * fmt
    
    * Apply suggestions from code review
    
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
    
    * Add some more comments
    
    * Update datafusion/physical-expr-common/src/physical_expr.rs
    
    Co-authored-by: Berkay Şahin 
<124376117+berkaysynn...@users.noreply.github.com>
    
    * Simplify trait to concrete impl
    
    * clippy
    
    * remap children in update()
    
    * better test
    
    ---------
    
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
    Co-authored-by: Berkay Şahin 
<124376117+berkaysynn...@users.noreply.github.com>
---
 Cargo.lock                                         |   1 +
 .../physical-expr-common/src/physical_expr.rs      |  77 ++++
 datafusion/physical-expr/Cargo.toml                |   1 +
 .../src/expressions/dynamic_filters.rs             | 474 +++++++++++++++++++++
 datafusion/physical-expr/src/expressions/mod.rs    |   1 +
 datafusion/physical-expr/src/lib.rs                |   2 +-
 datafusion/physical-expr/src/utils/mod.rs          |  25 ++
 datafusion/physical-optimizer/src/pruning.rs       |   4 +
 datafusion/proto/src/physical_plan/to_proto.rs     |   6 +-
 9 files changed, 589 insertions(+), 2 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index cc771331eb..50412bca5b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2421,6 +2421,7 @@ dependencies = [
  "half",
  "hashbrown 0.14.5",
  "indexmap 2.8.0",
+ "insta",
  "itertools 0.14.0",
  "log",
  "paste",
diff --git a/datafusion/physical-expr-common/src/physical_expr.rs 
b/datafusion/physical-expr-common/src/physical_expr.rs
index 43f214607f..3bc41d2652 100644
--- a/datafusion/physical-expr-common/src/physical_expr.rs
+++ b/datafusion/physical-expr-common/src/physical_expr.rs
@@ -27,6 +27,7 @@ use arrow::array::BooleanArray;
 use arrow::compute::filter_record_batch;
 use arrow::datatypes::{DataType, Schema};
 use arrow::record_batch::RecordBatch;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
 use datafusion_expr_common::columnar_value::ColumnarValue;
 use datafusion_expr_common::interval_arithmetic::Interval;
@@ -283,6 +284,55 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + 
DynEq + DynHash {
     /// See the [`fmt_sql`] function for an example of printing 
`PhysicalExpr`s as SQL.
     ///
     fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result;
+
+    /// Take a snapshot of this `PhysicalExpr`, if it is dynamic.
+    ///
+    /// "Dynamic" in this case means containing references to structures that 
may change
+    /// during plan execution, such as hash tables.
+    ///
+    /// This method is used to capture the current state of `PhysicalExpr`s 
that may contain
+    /// dynamic references to other operators in order to serialize it over 
the wire
+    /// or treat it via downcast matching.
+    ///
+    /// You should not call this method directly as it does not handle 
recursion.
+    /// Instead use [`snapshot_physical_expr`] to handle recursion and capture 
the
+    /// full state of the `PhysicalExpr`.
+    ///
+    /// This is expected to return "simple" expressions that do not have 
mutable state
+    /// and are composed of DataFusion's built-in `PhysicalExpr` 
implementations.
+    /// Callers however should *not* assume anything about the returned 
expressions
+    /// since callers and implementers may not agree on what "simple" or 
"built-in"
+    /// means.
+    /// In other words, if you need to serialize a `PhysicalExpr` across the 
wire
+    /// you should call this method and then try to serialize the result,
+    /// but you should handle unknown or unexpected `PhysicalExpr` 
implementations gracefully
+    /// just as if you had not called this method at all.
+    ///
+    /// In particular, consider:
+    /// * A `PhysicalExpr` that references the current state of a 
`datafusion::physical_plan::TopK`
+    ///   that is involved in a query with `SELECT * FROM t1 ORDER BY a LIMIT 
10`.
+    ///   This function may return something like `a >= 12`.
+    /// * A `PhysicalExpr` that references the current state of a 
`datafusion::physical_plan::joins::HashJoinExec`
+    ///   from a query such as `SELECT * FROM t1 JOIN t2 ON t1.a = t2.b`.
+    ///   This function may return something like `t2.b IN (1, 5, 7)`.
+    ///
+    /// A system or function that can only deal with a hardcoded set of 
`PhysicalExpr` implementations
+    /// or needs to serialize this state to bytes may not be able to handle 
these dynamic references.
+    /// In such cases, we should return a simplified version of the 
`PhysicalExpr` that does not
+    /// contain these dynamic references.
+    ///
+    /// Systems that implement remote execution of plans, e.g. serialize a 
portion of the query plan
+    /// and send it across the wire to a remote executor may want to call this 
method after
+    /// every batch on the source side and brodcast / update the current 
snaphot to the remote executor.
+    ///
+    /// Note for implementers: this method should *not* handle recursion.
+    /// Recursion is handled in [`snapshot_physical_expr`].
+    fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
+        // By default, we return None to indicate that this PhysicalExpr does 
not
+        // have any dynamic references or state.
+        // This is a safe default behavior.
+        Ok(None)
+    }
 }
 
 /// [`PhysicalExpr`] can't be constrained by [`Eq`] directly because it must 
remain object
@@ -446,3 +496,30 @@ pub fn fmt_sql(expr: &dyn PhysicalExpr) -> impl Display + 
'_ {
 
     Wrapper { expr }
 }
+
+/// Take a snapshot of the given `PhysicalExpr` if it is dynamic.
+///
+/// Take a snapshot of this `PhysicalExpr` if it is dynamic.
+/// This is used to capture the current state of `PhysicalExpr`s that may 
contain
+/// dynamic references to other operators in order to serialize it over the 
wire
+/// or treat it via downcast matching.
+///
+/// See the documentation of [`PhysicalExpr::snapshot`] for more details.
+///
+/// # Returns
+///
+/// Returns an `Option<Arc<dyn PhysicalExpr>>` which is the snapshot of the
+/// `PhysicalExpr` if it is dynamic. If the `PhysicalExpr` does not have
+/// any dynamic references or state, it returns `None`.
+pub fn snapshot_physical_expr(
+    expr: Arc<dyn PhysicalExpr>,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    expr.transform_up(|e| {
+        if let Some(snapshot) = e.snapshot()? {
+            Ok(Transformed::yes(snapshot))
+        } else {
+            Ok(Transformed::no(Arc::clone(&e)))
+        }
+    })
+    .data()
+}
diff --git a/datafusion/physical-expr/Cargo.toml 
b/datafusion/physical-expr/Cargo.toml
index 97d028897b..47e3291e5c 100644
--- a/datafusion/physical-expr/Cargo.toml
+++ b/datafusion/physical-expr/Cargo.toml
@@ -57,6 +57,7 @@ petgraph = "0.7.1"
 arrow = { workspace = true, features = ["test_utils"] }
 criterion = { workspace = true }
 datafusion-functions = { workspace = true }
+insta = { workspace = true }
 rand = { workspace = true }
 rstest = { workspace = true }
 
diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs 
b/datafusion/physical-expr/src/expressions/dynamic_filters.rs
new file mode 100644
index 0000000000..c0a3285f0e
--- /dev/null
+++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs
@@ -0,0 +1,474 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    any::Any,
+    fmt::Display,
+    hash::Hash,
+    sync::{Arc, RwLock},
+};
+
+use crate::PhysicalExpr;
+use arrow::datatypes::{DataType, Schema};
+use datafusion_common::{
+    tree_node::{Transformed, TransformedResult, TreeNode},
+    Result,
+};
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};
+
+/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference 
to it.
+#[derive(Debug)]
+pub struct DynamicFilterPhysicalExpr {
+    /// The original children of this PhysicalExpr, if any.
+    /// This is necessary because the dynamic filter may be initialized with a 
placeholder (e.g. `lit(true)`)
+    /// and later remapped to the actual expressions that are being filtered.
+    /// But we need to know the children (e.g. columns referenced in the 
expression) ahead of time to evaluate the expression correctly.
+    children: Vec<Arc<dyn PhysicalExpr>>,
+    /// If any of the children were remapped / modified (e.g. to adjust for 
projections) we need to keep track of the new children
+    /// so that when we update `current()` in subsequent iterations we can 
re-apply the replacements.
+    remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
+    /// The source of dynamic filters.
+    inner: Arc<RwLock<Arc<dyn PhysicalExpr>>>,
+    /// For testing purposes track the data type and nullability to make sure 
they don't change.
+    /// If they do, there's a bug in the implementation.
+    /// But this can have overhead in production, so it's only included in our 
tests.
+    data_type: Arc<RwLock<Option<DataType>>>,
+    nullable: Arc<RwLock<Option<bool>>>,
+}
+
+impl Hash for DynamicFilterPhysicalExpr {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        let inner = self.current().expect("Failed to get current expression");
+        inner.dyn_hash(state);
+        self.children.dyn_hash(state);
+        self.remapped_children.dyn_hash(state);
+    }
+}
+
+impl PartialEq for DynamicFilterPhysicalExpr {
+    fn eq(&self, other: &Self) -> bool {
+        let inner = self.current().expect("Failed to get current expression");
+        let our_children = 
self.remapped_children.as_ref().unwrap_or(&self.children);
+        let other_children = 
other.remapped_children.as_ref().unwrap_or(&other.children);
+        let other = other.current().expect("Failed to get current expression");
+        inner.dyn_eq(other.as_any()) && our_children == other_children
+    }
+}
+
+impl Eq for DynamicFilterPhysicalExpr {}
+
+impl Display for DynamicFilterPhysicalExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let inner = self.current().expect("Failed to get current expression");
+        write!(f, "DynamicFilterPhysicalExpr [ {} ]", inner)
+    }
+}
+
+impl DynamicFilterPhysicalExpr {
+    /// Create a new [`DynamicFilterPhysicalExpr`]
+    /// from an initial expression and a list of children.
+    /// The list of children is provided separately because
+    /// the initial expression may not have the same children.
+    /// For example, if the initial expression is just `true`
+    /// it will not reference any columns, but we may know that
+    /// we are going to replace this expression with a real one
+    /// that does reference certain columns.
+    /// In this case you **must** pass in the columns that will be
+    /// used in the final expression as children to this function
+    /// since DataFusion is generally not compatible with dynamic
+    /// *children* in expressions.
+    ///
+    /// To determine the children you can:
+    ///
+    /// - Use [`collect_columns`] to collect the columns from the expression.
+    /// - Use existing information, such as the sort columns in a `SortExec`.
+    ///
+    /// Generally the important bit is that the *leaf children that reference 
columns
+    /// do not change* since those will be used to determine what columns need 
to read or projected
+    /// when evaluating the expression.
+    ///
+    /// [`collect_columns`]: crate::utils::collect_columns
+    #[allow(dead_code)] // Only used in tests for now
+    pub fn new(
+        children: Vec<Arc<dyn PhysicalExpr>>,
+        inner: Arc<dyn PhysicalExpr>,
+    ) -> Self {
+        Self {
+            children,
+            remapped_children: None, // Initially no remapped children
+            inner: Arc::new(RwLock::new(inner)),
+            data_type: Arc::new(RwLock::new(None)),
+            nullable: Arc::new(RwLock::new(None)),
+        }
+    }
+
+    fn remap_children(
+        children: &[Arc<dyn PhysicalExpr>],
+        remapped_children: Option<&Vec<Arc<dyn PhysicalExpr>>>,
+        expr: Arc<dyn PhysicalExpr>,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        if let Some(remapped_children) = remapped_children {
+            // Remap the children to the new children
+            // of the expression.
+            expr.transform_up(|child| {
+                // Check if this is any of our original children
+                if let Some(pos) =
+                    children.iter().position(|c| c.as_ref() == child.as_ref())
+                {
+                    // If so, remap it to the current children
+                    // of the expression.
+                    let new_child = Arc::clone(&remapped_children[pos]);
+                    Ok(Transformed::yes(new_child))
+                } else {
+                    // Otherwise, just return the expression
+                    Ok(Transformed::no(child))
+                }
+            })
+            .data()
+        } else {
+            // If we don't have any remapped children, just return the 
expression
+            Ok(Arc::clone(&expr))
+        }
+    }
+
+    /// Get the current expression.
+    /// This will return the current expression with any children
+    /// remapped to match calls to [`PhysicalExpr::with_new_children`].
+    pub fn current(&self) -> Result<Arc<dyn PhysicalExpr>> {
+        let inner = self
+            .inner
+            .read()
+            .map_err(|_| {
+                datafusion_common::DataFusionError::Execution(
+                    "Failed to acquire read lock for inner".to_string(),
+                )
+            })?
+            .clone();
+        let inner =
+            Self::remap_children(&self.children, 
self.remapped_children.as_ref(), inner)?;
+        Ok(inner)
+    }
+
+    /// Update the current expression.
+    /// Any children of this expression must be a subset of the original 
children
+    /// passed to the constructor.
+    /// This should be called e.g.:
+    /// - When we've computed the probe side's hash table in a HashJoinExec
+    /// - After every batch is processed if we update the TopK heap in a 
SortExec using a TopK approach.
+    #[allow(dead_code)] // Only used in tests for now
+    pub fn update(&self, new_expr: Arc<dyn PhysicalExpr>) -> Result<()> {
+        let mut current = self.inner.write().map_err(|_| {
+            datafusion_common::DataFusionError::Execution(
+                "Failed to acquire write lock for inner".to_string(),
+            )
+        })?;
+        // Remap the children of the new expression to match the original 
children
+        // We still do this again in `current()` but doing it preventively here
+        // reduces the work needed in some cases if `current()` is called 
multiple times
+        // and the same externally facing `PhysicalExpr` is used for both 
`with_new_children` and `update()`.`
+        let new_expr = Self::remap_children(
+            &self.children,
+            self.remapped_children.as_ref(),
+            new_expr,
+        )?;
+        *current = new_expr;
+        Ok(())
+    }
+}
+
+impl PhysicalExpr for DynamicFilterPhysicalExpr {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+        self.remapped_children
+            .as_ref()
+            .unwrap_or(&self.children)
+            .iter()
+            .collect()
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn PhysicalExpr>>,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        Ok(Arc::new(Self {
+            children: self.children.clone(),
+            remapped_children: Some(children),
+            inner: Arc::clone(&self.inner),
+            data_type: Arc::clone(&self.data_type),
+            nullable: Arc::clone(&self.nullable),
+        }))
+    }
+
+    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
+        let res = self.current()?.data_type(input_schema)?;
+        #[cfg(test)]
+        {
+            use datafusion_common::internal_err;
+            // Check if the data type has changed.
+            let mut data_type_lock = self
+                .data_type
+                .write()
+                .expect("Failed to acquire write lock for data_type");
+            if let Some(existing) = &*data_type_lock {
+                if existing != &res {
+                    // If the data type has changed, we have a bug.
+                    return internal_err!(
+                        "DynamicFilterPhysicalExpr data type has changed 
unexpectedly. \
+                        Expected: {existing:?}, Actual: {res:?}"
+                    );
+                }
+            } else {
+                *data_type_lock = Some(res.clone());
+            }
+        }
+        Ok(res)
+    }
+
+    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+        let res = self.current()?.nullable(input_schema)?;
+        #[cfg(test)]
+        {
+            use datafusion_common::internal_err;
+            // Check if the nullability has changed.
+            let mut nullable_lock = self
+                .nullable
+                .write()
+                .expect("Failed to acquire write lock for nullable");
+            if let Some(existing) = *nullable_lock {
+                if existing != res {
+                    // If the nullability has changed, we have a bug.
+                    return internal_err!(
+                        "DynamicFilterPhysicalExpr nullability has changed 
unexpectedly. \
+                        Expected: {existing}, Actual: {res}"
+                    );
+                }
+            } else {
+                *nullable_lock = Some(res);
+            }
+        }
+        Ok(res)
+    }
+
+    fn evaluate(
+        &self,
+        batch: &arrow::record_batch::RecordBatch,
+    ) -> Result<ColumnarValue> {
+        let current = self.current()?;
+        #[cfg(test)]
+        {
+            // Ensure that we are not evaluating after the expression has 
changed.
+            let schema = batch.schema();
+            self.nullable(&schema)?;
+            self.data_type(&schema)?;
+        };
+        current.evaluate(batch)
+    }
+
+    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let inner = self.current().map_err(|_| std::fmt::Error)?;
+        inner.fmt_sql(f)
+    }
+
+    fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
+        // Return the current expression as a snapshot.
+        Ok(Some(self.current()?))
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use crate::{
+        expressions::{col, lit, BinaryExpr},
+        utils::reassign_predicate_columns,
+    };
+    use arrow::{
+        array::RecordBatch,
+        datatypes::{DataType, Field, Schema},
+    };
+    use datafusion_common::ScalarValue;
+
+    use super::*;
+
+    #[test]
+    fn test_remap_children() {
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, false),
+        ]));
+        let expr = Arc::new(BinaryExpr::new(
+            col("a", &table_schema).unwrap(),
+            datafusion_expr::Operator::Eq,
+            lit(42) as Arc<dyn PhysicalExpr>,
+        ));
+        let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
+            vec![col("a", &table_schema).unwrap()],
+            expr as Arc<dyn PhysicalExpr>,
+        ));
+        // Simulate two `ParquetSource` files with different filter schemas
+        // Both of these should hit the same inner `PhysicalExpr` even after 
`update()` is called
+        // and be able to remap children independently.
+        let filter_schema_1 = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, false),
+        ]));
+        let filter_schema_2 = Arc::new(Schema::new(vec![
+            Field::new("b", DataType::Int32, false),
+            Field::new("a", DataType::Int32, false),
+        ]));
+        // Each ParquetExec calls `with_new_children` on the 
DynamicFilterPhysicalExpr
+        // and remaps the children to the file schema.
+        let dynamic_filter_1 = reassign_predicate_columns(
+            Arc::clone(&dynamic_filter) as Arc<dyn PhysicalExpr>,
+            &filter_schema_1,
+            false,
+        )
+        .unwrap();
+        let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
+        insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
+        let dynamic_filter_2 = reassign_predicate_columns(
+            Arc::clone(&dynamic_filter) as Arc<dyn PhysicalExpr>,
+            &filter_schema_2,
+            false,
+        )
+        .unwrap();
+        let snap = dynamic_filter_2.snapshot().unwrap().unwrap();
+        insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
+        // Both filters allow evaluating the same expression
+        let batch_1 = RecordBatch::try_new(
+            Arc::clone(&filter_schema_1),
+            vec![
+                // a
+                ScalarValue::Int32(Some(42)).to_array_of_size(1).unwrap(),
+                // b
+                ScalarValue::Int32(Some(43)).to_array_of_size(1).unwrap(),
+            ],
+        )
+        .unwrap();
+        let batch_2 = RecordBatch::try_new(
+            Arc::clone(&filter_schema_2),
+            vec![
+                // b
+                ScalarValue::Int32(Some(43)).to_array_of_size(1).unwrap(),
+                // a
+                ScalarValue::Int32(Some(42)).to_array_of_size(1).unwrap(),
+            ],
+        )
+        .unwrap();
+        // Evaluate the expression on both batches
+        let result_1 = dynamic_filter_1.evaluate(&batch_1).unwrap();
+        let result_2 = dynamic_filter_2.evaluate(&batch_2).unwrap();
+        // Check that the results are the same
+        let ColumnarValue::Array(arr_1) = result_1 else {
+            panic!("Expected ColumnarValue::Array");
+        };
+        let ColumnarValue::Array(arr_2) = result_2 else {
+            panic!("Expected ColumnarValue::Array");
+        };
+        assert!(arr_1.eq(&arr_2));
+        let expected = ScalarValue::Boolean(Some(true))
+            .to_array_of_size(1)
+            .unwrap();
+        assert!(arr_1.eq(&expected));
+        // Now lets update the expression
+        // Note that we update the *original* expression and that should be 
reflected in both the derived expressions
+        let new_expr = Arc::new(BinaryExpr::new(
+            col("a", &table_schema).unwrap(),
+            datafusion_expr::Operator::Gt,
+            lit(43) as Arc<dyn PhysicalExpr>,
+        ));
+        dynamic_filter
+            .update(Arc::clone(&new_expr) as Arc<dyn PhysicalExpr>)
+            .expect("Failed to update expression");
+        // Now we should be able to evaluate the new expression on both batches
+        let result_1 = dynamic_filter_1.evaluate(&batch_1).unwrap();
+        let result_2 = dynamic_filter_2.evaluate(&batch_2).unwrap();
+        // Check that the results are the same
+        let ColumnarValue::Array(arr_1) = result_1 else {
+            panic!("Expected ColumnarValue::Array");
+        };
+        let ColumnarValue::Array(arr_2) = result_2 else {
+            panic!("Expected ColumnarValue::Array");
+        };
+        assert!(arr_1.eq(&arr_2));
+        let expected = ScalarValue::Boolean(Some(false))
+            .to_array_of_size(1)
+            .unwrap();
+        assert!(arr_1.eq(&expected));
+    }
+
+    #[test]
+    fn test_snapshot() {
+        let expr = lit(42) as Arc<dyn PhysicalExpr>;
+        let dynamic_filter = DynamicFilterPhysicalExpr::new(vec![], 
Arc::clone(&expr));
+
+        // Take a snapshot of the current expression
+        let snapshot = dynamic_filter.snapshot().unwrap();
+        assert_eq!(snapshot, Some(expr));
+
+        // Update the current expression
+        let new_expr = lit(100) as Arc<dyn PhysicalExpr>;
+        dynamic_filter.update(Arc::clone(&new_expr)).unwrap();
+        // Take another snapshot
+        let snapshot = dynamic_filter.snapshot().unwrap();
+        assert_eq!(snapshot, Some(new_expr));
+    }
+
+    #[test]
+    fn test_dynamic_filter_physical_expr_misbehaves_data_type_nullable() {
+        let dynamic_filter =
+            DynamicFilterPhysicalExpr::new(vec![], lit(42) as Arc<dyn 
PhysicalExpr>);
+
+        // First call to data_type and nullable should set the initial values.
+        let initial_data_type = 
dynamic_filter.data_type(&Schema::empty()).unwrap();
+        let initial_nullable = 
dynamic_filter.nullable(&Schema::empty()).unwrap();
+
+        // Call again and expect no change.
+        let second_data_type = 
dynamic_filter.data_type(&Schema::empty()).unwrap();
+        let second_nullable = 
dynamic_filter.nullable(&Schema::empty()).unwrap();
+        assert_eq!(
+            initial_data_type, second_data_type,
+            "Data type should not change on second call."
+        );
+        assert_eq!(
+            initial_nullable, second_nullable,
+            "Nullability should not change on second call."
+        );
+
+        // Now change the current expression to something else.
+        dynamic_filter
+            .update(lit(ScalarValue::Utf8(None)) as Arc<dyn PhysicalExpr>)
+            .expect("Failed to update expression");
+        // Check that we error if we call data_type, nullable or evaluate 
after changing the expression.
+        assert!(
+            dynamic_filter.data_type(&Schema::empty()).is_err(),
+            "Expected err when data_type is called after changing the 
expression."
+        );
+        assert!(
+            dynamic_filter.nullable(&Schema::empty()).is_err(),
+            "Expected err when nullable is called after changing the 
expression."
+        );
+        let batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
+        assert!(
+            dynamic_filter.evaluate(&batch).is_err(),
+            "Expected err when evaluate is called after changing the 
expression."
+        );
+    }
+}
diff --git a/datafusion/physical-expr/src/expressions/mod.rs 
b/datafusion/physical-expr/src/expressions/mod.rs
index f00b49f503..d77207fbbc 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -22,6 +22,7 @@ mod binary;
 mod case;
 mod cast;
 mod column;
+mod dynamic_filters;
 mod in_list;
 mod is_not_null;
 mod is_null;
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index 93ced2eb62..9f795c81fa 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -68,7 +68,7 @@ pub use planner::{create_physical_expr, 
create_physical_exprs};
 pub use scalar_function::ScalarFunctionExpr;
 
 pub use datafusion_physical_expr_common::utils::reverse_order_bys;
-pub use utils::split_conjunction;
+pub use utils::{conjunction, conjunction_opt, split_conjunction};
 
 // For backwards compatibility
 pub mod tree_node {
diff --git a/datafusion/physical-expr/src/utils/mod.rs 
b/datafusion/physical-expr/src/utils/mod.rs
index 7e4c7f0e10..b4d0758fd2 100644
--- a/datafusion/physical-expr/src/utils/mod.rs
+++ b/datafusion/physical-expr/src/utils/mod.rs
@@ -47,6 +47,31 @@ pub fn split_conjunction(
     split_impl(Operator::And, predicate, vec![])
 }
 
+/// Create a conjunction of the given predicates.
+/// If the input is empty, return a literal true.
+/// If the input contains a single predicate, return the predicate.
+/// Otherwise, return a conjunction of the predicates (e.g. `a AND b AND c`).
+pub fn conjunction(
+    predicates: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>,
+) -> Arc<dyn PhysicalExpr> {
+    conjunction_opt(predicates).unwrap_or_else(|| 
crate::expressions::lit(true))
+}
+
+/// Create a conjunction of the given predicates.
+/// If the input is empty or the return None.
+/// If the input contains a single predicate, return Some(predicate).
+/// Otherwise, return a Some(..) of a conjunction of the predicates (e.g. 
`Some(a AND b AND c)`).
+pub fn conjunction_opt(
+    predicates: impl IntoIterator<Item = Arc<dyn PhysicalExpr>>,
+) -> Option<Arc<dyn PhysicalExpr>> {
+    predicates
+        .into_iter()
+        .fold(None, |acc, predicate| match acc {
+            None => Some(predicate),
+            Some(acc) => Some(Arc::new(BinaryExpr::new(acc, Operator::And, 
predicate))),
+        })
+}
+
 /// Assume the predicate is in the form of DNF, split the predicate to a Vec 
of PhysicalExprs.
 ///
 /// For example, split "a1 = a2 OR b1 <= b2 OR c1 != c2" into ["a1 = a2", "b1 
<= b2", "c1 != c2"]
diff --git a/datafusion/physical-optimizer/src/pruning.rs 
b/datafusion/physical-optimizer/src/pruning.rs
index 42d08da20b..1dd168f181 100644
--- a/datafusion/physical-optimizer/src/pruning.rs
+++ b/datafusion/physical-optimizer/src/pruning.rs
@@ -41,6 +41,7 @@ use datafusion_common::{Column, DFSchema};
 use datafusion_expr_common::operator::Operator;
 use datafusion_physical_expr::utils::{collect_columns, Guarantee, 
LiteralGuarantee};
 use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
+use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr;
 use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
 
 /// A source of runtime statistical information to [`PruningPredicate`]s.
@@ -527,6 +528,9 @@ impl PruningPredicate {
     /// See the struct level documentation on [`PruningPredicate`] for more
     /// details.
     pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> 
Result<Self> {
+        // Get a (simpler) snapshot of the physical expr here to use with 
`PruningPredicate`
+        // which does not handle dynamic exprs  in general
+        let expr = snapshot_physical_expr(expr)?;
         let unhandled_hook = 
Arc::new(ConstantUnhandledPredicateHook::default()) as _;
 
         // build predicate expression once
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index f6546ff3f2..af7800d6fe 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -22,6 +22,7 @@ use datafusion::datasource::file_format::parquet::ParquetSink;
 use datafusion::datasource::physical_plan::FileSink;
 use datafusion::physical_expr::window::{SlidingAggregateWindowExpr, 
StandardWindowExpr};
 use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, 
ScalarFunctionExpr};
+use datafusion::physical_expr_common::physical_expr::snapshot_physical_expr;
 use datafusion::physical_plan::expressions::{
     BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, 
IsNullExpr,
     Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
@@ -210,6 +211,9 @@ pub fn serialize_physical_expr(
     value: &Arc<dyn PhysicalExpr>,
     codec: &dyn PhysicalExtensionCodec,
 ) -> Result<protobuf::PhysicalExprNode> {
+    // Snapshot the expr in case it has dynamic predicate state so
+    // it can be serialized
+    let value = snapshot_physical_expr(Arc::clone(value))?;
     let expr = value.as_any();
 
     if let Some(expr) = expr.downcast_ref::<Column>() {
@@ -368,7 +372,7 @@ pub fn serialize_physical_expr(
         })
     } else {
         let mut buf: Vec<u8> = vec![];
-        match codec.try_encode_expr(value, &mut buf) {
+        match codec.try_encode_expr(&value, &mut buf) {
             Ok(_) => {
                 let inputs: Vec<protobuf::PhysicalExprNode> = value
                     .children()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to