alamb commented on code in PR #9913:
URL: https://github.com/apache/arrow-datafusion/pull/9913#discussion_r1548533715
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -370,6 +373,221 @@ impl LogicalPlan {
}
}
+ pub fn transform_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
Review Comment:
Can we please add some comments explaining what this is doing (namely that
it applying a function to effectively rewrite the expressions of the
LogicalPlan node in place)? It is a very cool API to see actually, as it will
hopefully save a bunch of copying
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -1080,8 +1298,72 @@ impl LogicalPlan {
}
impl LogicalPlan {
+ pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
+ &self,
+ f: &mut F,
+ ) -> Result<TreeNodeRecursion> {
+ // Compared to the default implementation, we need to invoke
+ // [`Self::apply_subqueries`] before visiting its children
+ handle_visit_recursion!(f(self)?, DOWN);
+ self.apply_subqueries(f)?;
+ self.apply_children(&mut |n| n.apply_with_subqueries(f))
+ }
+
+ pub fn visit_with_subqueries<V: TreeNodeVisitor<Node = Self>>(
+ &self,
+ visitor: &mut V,
+ ) -> Result<TreeNodeRecursion> {
+ // Compared to the default implementation, we need to invoke
+ // [`Self::visit_subqueries`] before visiting its children
+ match visitor.f_down(self)? {
+ TreeNodeRecursion::Continue => {
+ self.visit_subqueries(visitor)?;
+ handle_visit_recursion!(
+ self.apply_children(&mut |n|
n.visit_with_subqueries(visitor))?,
+ UP
+ );
+ visitor.f_up(self)
+ }
+ TreeNodeRecursion::Jump => {
+ self.visit_subqueries(visitor)?;
Review Comment:
Does this treat subqueries as though they were siblings of the node (like if
`TNR::Jump` is returned subquery children will still be visited?)
If so I think I would find that unexpected (I would expect that the
subqueries are treated like additional children)
##########
datafusion/optimizer/src/analyzer/mod.rs:
##########
@@ -155,7 +155,7 @@ impl Analyzer {
/// Do necessary check and fail the invalid plan
fn check_plan(plan: &LogicalPlan) -> Result<()> {
- plan.apply(&mut |plan: &LogicalPlan| {
+ plan.apply_with_subqueries(&mut |plan: &LogicalPlan| {
Review Comment:
It is interesting that this didn't change any plans / tests.
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -1080,8 +1298,72 @@ impl LogicalPlan {
}
impl LogicalPlan {
+ pub fn apply_with_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
Review Comment:
Likewise here, can we please add docstrings?
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -370,6 +373,221 @@ impl LogicalPlan {
}
}
+ pub fn transform_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
+ self,
+ mut f: F,
+ ) -> Result<Transformed<Self>> {
+ Ok(match self {
+ LogicalPlan::Projection(Projection {
+ expr,
+ input,
+ schema,
+ }) => expr
+ .into_iter()
+ .map_until_stop_and_collect(f)?
+ .update_data(|expr| {
+ LogicalPlan::Projection(Projection {
+ expr,
+ input,
+ schema,
+ })
+ }),
+ LogicalPlan::Values(Values { schema, values }) => values
+ .into_iter()
+ .map_until_stop_and_collect(|value| {
+ value.into_iter().map_until_stop_and_collect(&mut f)
+ })?
+ .update_data(|values| LogicalPlan::Values(Values { schema,
values })),
+ LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)?
+ .update_data(|predicate| {
+ LogicalPlan::Filter(Filter { predicate, input })
+ }),
+ LogicalPlan::Repartition(Repartition {
+ input,
+ partitioning_scheme,
+ }) => match partitioning_scheme {
+ Partitioning::Hash(expr, usize) => expr
+ .into_iter()
+ .map_until_stop_and_collect(f)?
+ .update_data(|expr| Partitioning::Hash(expr, usize)),
+ Partitioning::DistributeBy(expr) => expr
+ .into_iter()
+ .map_until_stop_and_collect(f)?
+ .update_data(Partitioning::DistributeBy),
+ Partitioning::RoundRobinBatch(_) =>
Transformed::no(partitioning_scheme),
+ }
+ .update_data(|partitioning_scheme| {
+ LogicalPlan::Repartition(Repartition {
+ input,
+ partitioning_scheme,
+ })
+ }),
+ LogicalPlan::Window(Window {
+ input,
+ window_expr,
+ schema,
+ }) => window_expr
+ .into_iter()
+ .map_until_stop_and_collect(f)?
+ .update_data(|window_expr| {
+ LogicalPlan::Window(Window {
+ input,
+ window_expr,
+ schema,
+ })
+ }),
+ LogicalPlan::Aggregate(Aggregate {
+ input,
+ group_expr,
+ aggr_expr,
+ schema,
+ }) => map_until_stop_and_collect!(
+ group_expr.into_iter().map_until_stop_and_collect(&mut f),
+ aggr_expr,
+ aggr_expr.into_iter().map_until_stop_and_collect(&mut f)
+ )?
+ .update_data(|(group_expr, aggr_expr)| {
+ LogicalPlan::Aggregate(Aggregate {
+ input,
+ group_expr,
+ aggr_expr,
+ schema,
+ })
+ }),
+
+ // There are two part of expression for join, equijoin(on) and
non-equijoin(filter).
+ // 1. the first part is `on.len()` equijoin expressions, and the
struct of each expr is `left-on = right-on`.
+ // 2. the second part is non-equijoin(filter).
+ LogicalPlan::Join(Join {
+ left,
+ right,
+ on,
+ filter,
+ join_type,
+ join_constraint,
+ schema,
+ null_equals_null,
+ }) => map_until_stop_and_collect!(
+ on.into_iter().map_until_stop_and_collect(
+ |on| map_until_stop_and_collect!(f(on.0), on.1, f(on.1))
+ ),
+ filter,
+ filter.map_or(Ok::<_, DataFusionError>(Transformed::no(None)),
|e| {
Review Comment:
is this the magic required to handle `Option`?
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -370,6 +373,221 @@ impl LogicalPlan {
}
}
+ pub fn transform_expressions<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
+ self,
+ mut f: F,
+ ) -> Result<Transformed<Self>> {
+ Ok(match self {
+ LogicalPlan::Projection(Projection {
+ expr,
+ input,
+ schema,
+ }) => expr
+ .into_iter()
+ .map_until_stop_and_collect(f)?
+ .update_data(|expr| {
Review Comment:
😍 -- that is quite a neat way to express this pattern and get the ownership
sorted out 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]