berkaysynnada commented on code in PR #15568: URL: https://github.com/apache/datafusion/pull/15568#discussion_r2035353055
########## datafusion/physical-expr-common/src/physical_expr.rs: ########## @@ -283,6 +284,55 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { /// See the [`fmt_sql`] function for an example of printing `PhysicalExpr`s as SQL. /// fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result; + + /// Take a snapshot of this `PhysicalExpr`, if it is dynamic. + /// + /// "Dynamic" in this case means containing references to structures that may change + /// during plan execution, such as hash tables. + /// + /// This method is used to capture the current state of `PhysicalExpr`s that may contain + /// dynamic references to other operators in order to serialize it over the wire + /// or treat it via downcast matching. + /// + /// You should not call this method directly as it does not handle recursion. + /// Instead use [`snapshot_physical_expr`] to handle recursion and capture the + /// full state of the `PhysicalExpr`. + /// + /// This is expected to return "simple" expressions that do not have mutable state + /// and are composed of DataFusion's built-in `PhysicalExpr` implementations. + /// Callers however should *not* assume anything about the returned expressions + /// since callers and implementers may not agree on what "simple" or "built-in" + /// means. + /// In other words, if you need to searlize a `PhysicalExpr` across the wire Review Comment: ```suggestion /// In other words, if you need to serialize a `PhysicalExpr` across the wire ``` ########## datafusion/physical-expr/src/expressions/dynamic_filters.rs: ########## @@ -0,0 +1,442 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + any::Any, + fmt::Display, + hash::Hash, + sync::{Arc, RwLock}, +}; + +use crate::{utils::conjunction, PhysicalExpr}; +use arrow::datatypes::{DataType, Schema}; +use datafusion_common::{ + tree_node::{Transformed, TransformedResult, TreeNode}, + Result, +}; +use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash}; + +/// A source of dynamic runtime filters. +/// +/// Operators can create implementations of this trait that get wrapped +/// in [`DynamicFilterPhysicalExpr`] to be pushed down into and through to scans, joins, etc. +/// +/// For example: +/// - A `HashJoin` operator can use this to provide a filter expression +/// that filters out rows from the right side of the join based on the +/// values from the left side. +/// - A `TopK` operator can use this to provide a filter expression +/// that filters out rows from the input based on the values from the +/// top K rows. +/// +/// Initially this trait is intended to be only for internal use as a way to facilitate +/// building [`DynamicFilterPhysicalExpr`]s in the various operators that will be generating +/// dynamic filters. +/// Because of this we've made it a public trait in a private module so that it is only +/// accessible within the crate. +/// If you would like to use this trait in your own code, please open an issue +/// to discuss the use case and we can consider making it public. +pub trait DynamicFilterSource: + Send + Sync + std::fmt::Debug + DynEq + DynHash + Display + 'static +{ + /// Take a snapshot of the current state of filtering, returning a non-dynamic PhysicalExpr. + /// This is used to e.g. serialize dynamic filters across the wire or to pass them into systems + /// that won't use the `PhysicalExpr` API (e.g. matching on the concrete types of the expressions like `PruningPredicate` does). + /// For example, it is expected that this returns a relatively simple expression such as `col1 > 5` for a TopK operator or + /// `col2 IN (1, 2, ... N)` for a HashJoin operator. + fn snapshot_current_filters(&self) -> Result<Vec<Arc<dyn PhysicalExpr>>>; + + fn as_any(&self) -> &dyn Any; +} + +impl PartialEq for dyn DynamicFilterSource { + fn eq(&self, other: &Self) -> bool { + self.dyn_eq(other.as_any()) + } +} + +impl Eq for dyn DynamicFilterSource {} + +/// A wrapper around a [`DynamicFilterSource`] that allows it to be used as a physical expression. +/// This will call [`DynamicFilterSource::snapshot_current_filters`] to get the current filters for each call to +/// [`PhysicalExpr::evaluate`], [`PhysicalExpr::data_type`], and [`PhysicalExpr::nullable`]. +/// It also implements [`PhysicalExpr::snapshot`] by forwarding the call to [`DynamicFilterSource::snapshot_current_filters`]. +#[derive(Debug)] +pub struct DynamicFilterPhysicalExpr { Review Comment: Is this going to be the shared object between sources and dynamic filter introducing operators? ########## datafusion/physical-expr/src/expressions/dynamic_filters.rs: ########## @@ -0,0 +1,442 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + any::Any, + fmt::Display, + hash::Hash, + sync::{Arc, RwLock}, +}; + +use crate::{utils::conjunction, PhysicalExpr}; +use arrow::datatypes::{DataType, Schema}; +use datafusion_common::{ + tree_node::{Transformed, TransformedResult, TreeNode}, + Result, +}; +use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash}; + +/// A source of dynamic runtime filters. +/// +/// Operators can create implementations of this trait that get wrapped +/// in [`DynamicFilterPhysicalExpr`] to be pushed down into and through to scans, joins, etc. +/// +/// For example: +/// - A `HashJoin` operator can use this to provide a filter expression +/// that filters out rows from the right side of the join based on the +/// values from the left side. +/// - A `TopK` operator can use this to provide a filter expression +/// that filters out rows from the input based on the values from the +/// top K rows. +/// +/// Initially this trait is intended to be only for internal use as a way to facilitate +/// building [`DynamicFilterPhysicalExpr`]s in the various operators that will be generating +/// dynamic filters. +/// Because of this we've made it a public trait in a private module so that it is only +/// accessible within the crate. +/// If you would like to use this trait in your own code, please open an issue +/// to discuss the use case and we can consider making it public. +pub trait DynamicFilterSource: Review Comment: There will be different structs for different filter introducing streams or they will utilize a common struct? ########## datafusion/physical-expr-common/src/physical_expr.rs: ########## @@ -283,6 +284,55 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { /// See the [`fmt_sql`] function for an example of printing `PhysicalExpr`s as SQL. /// fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result; + + /// Take a snapshot of this `PhysicalExpr`, if it is dynamic. + /// + /// "Dynamic" in this case means containing references to structures that may change + /// during plan execution, such as hash tables. + /// + /// This method is used to capture the current state of `PhysicalExpr`s that may contain + /// dynamic references to other operators in order to serialize it over the wire + /// or treat it via downcast matching. + /// + /// You should not call this method directly as it does not handle recursion. + /// Instead use [`snapshot_physical_expr`] to handle recursion and capture the + /// full state of the `PhysicalExpr`. + /// + /// This is expected to return "simple" expressions that do not have mutable state + /// and are composed of DataFusion's built-in `PhysicalExpr` implementations. + /// Callers however should *not* assume anything about the returned expressions + /// since callers and implementers may not agree on what "simple" or "built-in" + /// means. + /// In other words, if you need to searlize a `PhysicalExpr` across the wire + /// you should call this method and then try to serialize the result, + /// but you should handle unknown or unexpected `PhysicalExpr` implementations gracefully + /// just as if you had not called this method at all. + /// + /// In particular, consider: + /// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::TopK` + /// that is involved in a query with `SELECT * FROM t1 ORDER BY a LIMIT 10`. + /// This function may return something like `a >= 12`. + /// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::joins::HashJoinExec` + /// from a query such as `SELECT * FROM t1 JOIN t2 ON t1.a = t2.b`. + /// This function may return something like `t2.b IN (1, 5, 7)`. + /// + /// A system or function that can only deal with a hardcoded set of `PhysicalExpr` implementations + /// or needs to serialize this state to bytes may not be able to handle these dynamic references. + /// In such cases, we should return a simplified version of the `PhysicalExpr` that does not + /// contain these dynamic references. + /// + /// Systems that implement remote execution of plans, e.g. serialize a portion of the query plan + /// and send it across the wire to a remote executor may want to call this method after + /// every batch on the source side and brodcast / update the current snaphot to the remote executor. + /// + /// Note for implementers: this method should *not* handle recursion. + /// Recursion is handled in [`snapshot_physical_expr`]. + fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> { Review Comment: All existing expressions no need to implement this, right? Even no need another structure implementing this other than DynamicFilterPhysicalExpr, as it seems to be designed to handle all tricks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org