LiaCastaneda commented on code in PR #20901:
URL: https://github.com/apache/datafusion/pull/20901#discussion_r2989423299
##########
datafusion/physical-expr-common/src/physical_expr.rs:
##########
@@ -416,6 +416,28 @@ pub trait PhysicalExpr: Any + Send + Sync + Display +
Debug + DynEq + DynHash {
0
}
+ /// Bind runtime-specific data into this expression, if needed.
+ ///
+ /// This hook lets an expression replace itself with a runtime-bound
version using the given
+ /// `context` (e.g. binding a per-partition view).
+ ///
+ /// Binding is single-pass over the existing tree. If this method returns
a replacement
+ /// expression that itself contains additional bindable nodes, those newly
introduced nodes are
+ /// not rebound in the same call.
+ ///
+ /// You should not call this method directly as it does not handle
recursion. Instead use
+ /// [`bind_runtime_physical_expr`] to handle recursion and bind the full
expression tree.
+ ///
+ /// Note for implementers: this method should not handle recursion.
+ /// Recursion is handled in [`bind_runtime_physical_expr`].
+ fn bind_runtime(
Review Comment:
iiuc the idea is to call this function in `open()` or `execute()` right?
where we have the partitioning information.
Do we want to call this fucntion only when we know we are preserving
partitioning?
##########
datafusion/physical-expr-common/src/physical_expr.rs:
##########
@@ -856,4 +1028,96 @@ mod test {
&BooleanArray::from(vec![true; 5]),
);
}
+
+ #[test]
+ fn test_bind_runtime_physical_expr_no_transform() {
+ let ctx = RuntimeBindContext {
+ target_key: "right",
+ bound_name: "bound",
+ };
+
+ // TestExpr does not override bind_runtime, so traversal is a no-op.
+ let default_expr: Arc<dyn PhysicalExpr> = Arc::new(TestExpr {});
+ let transformed =
+ super::bind_runtime_physical_expr_opt(Arc::clone(&default_expr),
&ctx)
+ .unwrap();
+
+ assert!(!transformed.transformed);
+ assert!(Arc::ptr_eq(&default_expr, &transformed.data));
+
+ // Context mismatch returns no transform even for bindable nodes.
+ let bindable_expr: Arc<dyn PhysicalExpr> =
Arc::new(RuntimeBindTestExpr::new(
+ "left",
+ BindBehavior::Match("left"),
+ vec![],
+ ));
+ let transformed =
+ super::bind_runtime_physical_expr_opt(Arc::clone(&bindable_expr),
&ctx)
+ .unwrap();
+
+ assert!(!transformed.transformed);
+ assert!(Arc::ptr_eq(&bindable_expr, &transformed.data));
+ }
+
+ #[test]
+ fn test_bind_runtime_physical_expr_recurses() {
+ // Only the right child matches target_key and should be rewritten.
+ let left: Arc<dyn PhysicalExpr> = Arc::new(RuntimeBindTestExpr::new(
+ "left",
+ BindBehavior::Match("left"),
+ vec![],
+ ));
+ let right: Arc<dyn PhysicalExpr> = Arc::new(RuntimeBindTestExpr::new(
+ "right",
+ BindBehavior::Match("right"),
+ vec![],
+ ));
+ let root: Arc<dyn PhysicalExpr> = Arc::new(RuntimeBindTestExpr::new(
+ "root",
+ BindBehavior::None,
+ vec![Arc::clone(&left), Arc::clone(&right)],
+ ));
+ let ctx = RuntimeBindContext {
+ target_key: "right",
+ bound_name: "right_bound",
+ };
+
+ let transformed = super::bind_runtime_physical_expr_opt(root,
&ctx).unwrap();
+ assert!(transformed.transformed);
+
+ let root = transformed
+ .data
+ .as_any()
+ .downcast_ref::<RuntimeBindTestExpr>()
+ .expect("root should be RuntimeBindTestExpr");
+ let left = root.children[0]
+ .as_any()
+ .downcast_ref::<RuntimeBindTestExpr>()
+ .expect("left should be RuntimeBindTestExpr");
+ let right = root.children[1]
+ .as_any()
+ .downcast_ref::<RuntimeBindTestExpr>()
+ .expect("right should be RuntimeBindTestExpr");
+
+ assert_eq!(left.name, "left");
+ assert_eq!(right.name, "right_bound");
+ assert_eq!(right.bind_behavior, BindBehavior::None);
Review Comment:
nit: I think it's worth adding here a small comment mentioning why we expect
these values, it's a bit hard to follow just by looking at the assert
expressions.
##########
datafusion/physical-expr-common/src/physical_expr.rs:
##########
@@ -618,6 +640,46 @@ pub fn snapshot_physical_expr_opt(
})
}
+/// Bind runtime-specific data into the given `PhysicalExpr`.
+///
+/// See the documentation of [`PhysicalExpr::bind_runtime`] for more details.
+///
+/// Runtime binding is applied once over the current expression tree.
+///
+/// # Returns
+///
+/// Returns a runtime-bound expression if any node required binding,
+/// otherwise returns the original expression.
+pub fn bind_runtime_physical_expr(
+ expr: Arc<dyn PhysicalExpr>,
+ context: &(dyn Any + Send + Sync),
+) -> Result<Arc<dyn PhysicalExpr>> {
+ bind_runtime_physical_expr_opt(expr, context).data()
+}
+
+/// Bind runtime-specific data into the given `PhysicalExpr`.
+///
+/// See the documentation of [`PhysicalExpr::bind_runtime`] for more details.
+///
+/// Runtime binding is applied once over the current expression tree.
+///
+/// # Returns
+///
+/// Returns a [`Transformed`] indicating whether any runtime binding happened,
+/// along with the resulting expression.
+pub fn bind_runtime_physical_expr_opt(
+ expr: Arc<dyn PhysicalExpr>,
+ context: &(dyn Any + Send + Sync),
+) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
+ expr.transform_up(|e| {
+ if let Some(bound) = e.bind_runtime(context)? {
+ Ok(Transformed::yes(bound))
Review Comment:
This looks very similar to the
[snapshot_physical_expr_opt](https://github.com/apache/datafusion/blob/6e0dde0890ef967183f5fa828195a44cbf99b870/datafusion/physical-expr-common/src/physical_expr.rs#L609)
implementation for `DynamicFilterPhysicalExpr`, no? I also notice that
`snapshot()` is only implemented in `DynamicFilterPhysicalExpr`. Unless I'm
misunderstanding something, it seems like these two methods do similar things,
but `bind_runtime` is the more general version since it can carry context?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]