alamb commented on code in PR #7942:
URL: https://github.com/apache/arrow-datafusion/pull/7942#discussion_r1431798239
##########
datafusion/core/src/physical_optimizer/sort_pushdown.rs:
##########
@@ -18,173 +18,23 @@
use std::sync::Arc;
use crate::physical_optimizer::utils::{
- add_sort_above, is_limit, is_sort_preserving_merge, is_union, is_window,
+ is_limit, is_sort_preserving_merge, is_union, is_window,
};
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::joins::utils::calculate_join_output_ordering;
use crate::physical_plan::joins::{HashJoinExec, SortMergeJoinExec};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
-use crate::physical_plan::sorts::sort::SortExec;
-use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
+use crate::physical_plan::ExecutionPlan;
-use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{plan_err, DataFusionError, JoinSide, Result};
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{
LexRequirementRef, PhysicalSortExpr, PhysicalSortRequirement,
};
-use itertools::izip;
-
-/// This is a "data class" we use within the [`EnforceSorting`] rule to push
-/// down [`SortExec`] in the plan. In some cases, we can reduce the total
-/// computational cost by pushing down `SortExec`s through some executors.
-///
-/// [`EnforceSorting`]:
crate::physical_optimizer::enforce_sorting::EnforceSorting
-#[derive(Debug, Clone)]
Review Comment:
This was inlined into `pushdown_sort` via `transform_down_with_payload`
which I think is a nice change 👍
##########
datafusion/common/src/tree_node.rs:
##########
@@ -289,15 +492,80 @@ pub enum RewriteRecursion {
Skip,
}
-/// Controls how the [`TreeNode`] recursion should proceed for
[`TreeNode::visit`].
+/// Controls how the [`TreeNode`] recursion should proceed for
[`TreeNode::visit_down()`] and
+/// [`TreeNode::visit()`].
#[derive(Debug)]
-pub enum VisitRecursion {
- /// Continue the visit to this node tree.
+pub enum TreeNodeRecursion {
+ /// Continue the visit to the next node.
Continue,
- /// Keep recursive but skip applying op on the children
- Skip,
- /// Stop the visit to this node tree.
+
+ /// Prune the current subtree.
+ /// If a preorder visit of a tree node returns
[`TreeNodeRecursion::Prune`] then inner
+ /// children and children will not be visited and postorder visit of the
node will not
+ /// be invoked.
+ Prune,
+
+ /// Stop recursion on current tree.
+ /// If recursion runs on an inner tree then returning
[`TreeNodeRecursion::Stop`] doesn't
+ /// stop recursion on the outer tree.
Stop,
+
+ /// Stop recursion on all (including outer) trees.
+ StopAll,
+}
+
+impl TreeNodeRecursion {
+ pub fn and_then_on_continue<F>(self, f: F) -> Result<TreeNodeRecursion>
Review Comment:
These are neat helpers, it would be useful to document their intended
usecases if possible
##########
datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs:
##########
@@ -48,27 +48,27 @@ impl CombinePartialFinalAggregate {
impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
fn optimize(
&self,
- plan: Arc<dyn ExecutionPlan>,
+ mut plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
- plan.transform_down(&|plan| {
- let transformed =
- plan.as_any()
- .downcast_ref::<AggregateExec>()
- .and_then(|agg_exec| {
- if matches!(
- agg_exec.mode(),
- AggregateMode::Final |
AggregateMode::FinalPartitioned
- ) {
- agg_exec
- .input()
- .as_any()
- .downcast_ref::<AggregateExec>()
- .and_then(|input_agg_exec| {
- if matches!(
- input_agg_exec.mode(),
- AggregateMode::Partial
- ) && can_combine(
+ plan.transform_down(&mut |plan| {
+ plan.clone()
Review Comment:
it is certainly nice to avoid the clone
##########
datafusion/expr/src/expr.rs:
##########
@@ -81,8 +81,10 @@ use std::sync::Arc;
/// assert_eq!(binary_expr.op, Operator::Eq);
/// }
/// ```
-#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)]
pub enum Expr {
+ #[default]
Review Comment:
Can you explain a bit about the need / usecase for `Expr::Nop`? Could the
same be accomplished with `Option<Expr>`?
##########
datafusion/expr/src/expr.rs:
##########
@@ -954,11 +957,11 @@ impl Expr {
}
/// Remove an alias from an expression if one exists.
- pub fn unalias(self) -> Expr {
- match self {
- Expr::Alias(alias) => alias.expr.as_ref().clone(),
- _ => self,
+ pub fn unalias(&mut self) -> &mut Self {
Review Comment:
Nice spot -- I think we can avoid a copy like this too:
https://github.com/apache/arrow-datafusion/pull/8588
##########
datafusion/common/src/tree_node.rs:
##########
@@ -289,15 +492,80 @@ pub enum RewriteRecursion {
Skip,
}
-/// Controls how the [`TreeNode`] recursion should proceed for
[`TreeNode::visit`].
+/// Controls how the [`TreeNode`] recursion should proceed for
[`TreeNode::visit_down()`] and
+/// [`TreeNode::visit()`].
#[derive(Debug)]
-pub enum VisitRecursion {
- /// Continue the visit to this node tree.
+pub enum TreeNodeRecursion {
+ /// Continue the visit to the next node.
Continue,
- /// Keep recursive but skip applying op on the children
- Skip,
- /// Stop the visit to this node tree.
+
+ /// Prune the current subtree.
+ /// If a preorder visit of a tree node returns
[`TreeNodeRecursion::Prune`] then inner
+ /// children and children will not be visited and postorder visit of the
node will not
+ /// be invoked.
+ Prune,
Review Comment:
Is this equivalent to `RewriteRecursion::Skip`? If so, perhaps we can use
the same terminology
##########
datafusion/common/src/tree_node.rs:
##########
@@ -289,15 +492,80 @@ pub enum RewriteRecursion {
Skip,
}
-/// Controls how the [`TreeNode`] recursion should proceed for
[`TreeNode::visit`].
+/// Controls how the [`TreeNode`] recursion should proceed for
[`TreeNode::visit_down()`] and
+/// [`TreeNode::visit()`].
#[derive(Debug)]
-pub enum VisitRecursion {
- /// Continue the visit to this node tree.
+pub enum TreeNodeRecursion {
+ /// Continue the visit to the next node.
Continue,
- /// Keep recursive but skip applying op on the children
- Skip,
- /// Stop the visit to this node tree.
+
+ /// Prune the current subtree.
+ /// If a preorder visit of a tree node returns
[`TreeNodeRecursion::Prune`] then inner
+ /// children and children will not be visited and postorder visit of the
node will not
+ /// be invoked.
+ Prune,
+
+ /// Stop recursion on current tree.
+ /// If recursion runs on an inner tree then returning
[`TreeNodeRecursion::Stop`] doesn't
+ /// stop recursion on the outer tree.
Stop,
+
+ /// Stop recursion on all (including outer) trees.
+ StopAll,
+}
+
+impl TreeNodeRecursion {
+ pub fn and_then_on_continue<F>(self, f: F) -> Result<TreeNodeRecursion>
+ where
+ F: FnOnce() -> Result<TreeNodeRecursion>,
+ {
+ match self {
+ TreeNodeRecursion::Continue => f(),
+ o => Ok(o),
+ }
+ }
+
+ pub fn continue_on_prune(self) -> Result<TreeNodeRecursion> {
+ Ok(match self {
+ TreeNodeRecursion::Prune => TreeNodeRecursion::Continue,
+ o => o,
+ })
+ }
+
+ pub fn fail_on_prune(self) -> Result<TreeNodeRecursion> {
Review Comment:
I don't understand the usecase for this method -- if there is going to be a
panic, perhaps it would be clearer to put the check directly at the callsite
with a explination for why the situation warrants a panic
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -311,60 +309,63 @@ impl LogicalPlan {
exprs
}
- /// Calls `f` on all expressions (non-recursively) in the current
- /// logical plan node. This does not include expressions in any
- /// children.
- pub fn inspect_expressions<F, E>(self: &LogicalPlan, mut f: F) ->
Result<(), E>
+ /// Apply `f` on expressions of the plan node.
+ /// `f` is not allowed to return [`TreeNodeRecursion::Prune`].
Review Comment:
why can't it return `Prune`?
##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -311,60 +309,63 @@ impl LogicalPlan {
exprs
}
- /// Calls `f` on all expressions (non-recursively) in the current
- /// logical plan node. This does not include expressions in any
- /// children.
- pub fn inspect_expressions<F, E>(self: &LogicalPlan, mut f: F) ->
Result<(), E>
+ /// Apply `f` on expressions of the plan node.
+ /// `f` is not allowed to return [`TreeNodeRecursion::Prune`].
+ pub fn apply_expressions<F>(&self, f: &mut F) -> Result<TreeNodeRecursion>
where
- F: FnMut(&Expr) -> Result<(), E>,
+ F: FnMut(&Expr) -> Result<TreeNodeRecursion>,
{
+ let f = &mut |e: &Expr| f(e)?.fail_on_prune();
+
match self {
LogicalPlan::Projection(Projection { expr, .. }) => {
- expr.iter().try_for_each(f)
+ expr.iter().for_each_till_continue(f)
}
LogicalPlan::Values(Values { values, .. }) => {
- values.iter().flatten().try_for_each(f)
+ values.iter().flatten().for_each_till_continue(f)
}
LogicalPlan::Filter(Filter { predicate, .. }) => f(predicate),
LogicalPlan::Repartition(Repartition {
partitioning_scheme,
..
}) => match partitioning_scheme {
- Partitioning::Hash(expr, _) => expr.iter().try_for_each(f),
- Partitioning::DistributeBy(expr) =>
expr.iter().try_for_each(f),
- Partitioning::RoundRobinBatch(_) => Ok(()),
+ Partitioning::Hash(expr, _) =>
expr.iter().for_each_till_continue(f),
+ Partitioning::DistributeBy(expr) =>
expr.iter().for_each_till_continue(f),
+ Partitioning::RoundRobinBatch(_) =>
Ok(TreeNodeRecursion::Continue),
},
LogicalPlan::Window(Window { window_expr, .. }) => {
- window_expr.iter().try_for_each(f)
+ window_expr.iter().for_each_till_continue(f)
}
LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
..
- }) => group_expr.iter().chain(aggr_expr.iter()).try_for_each(f),
+ }) => group_expr
+ .iter()
+ .chain(aggr_expr.iter())
+ .for_each_till_continue(f),
// There are two part of expression for join, equijoin(on) and
non-equijoin(filter).
// 1. the first part is `on.len()` equijoin expressions, and the
struct of each expr is `left-on = right-on`.
// 2. the second part is non-equijoin(filter).
LogicalPlan::Join(Join { on, filter, .. }) => {
on.iter()
// it not ideal to create an expr here to analyze them,
but could cache it on the Join itself
.map(|(l, r)| Expr::eq(l.clone(), r.clone()))
- .try_for_each(|e| f(&e))?;
-
- if let Some(filter) = filter.as_ref() {
- f(filter)
- } else {
- Ok(())
- }
+ .for_each_till_continue(&mut |e| f(&e))?
+ .and_then_on_continue(||
filter.iter().for_each_till_continue(f))
}
- LogicalPlan::Sort(Sort { expr, .. }) =>
expr.iter().try_for_each(f),
+ LogicalPlan::Sort(Sort { expr, .. }) =>
expr.iter().for_each_till_continue(f),
LogicalPlan::Extension(extension) => {
// would be nice to avoid this copy -- maybe can
// update extension to just observer Exprs
- extension.node.expressions().iter().try_for_each(f)
+ extension
+ .node
+ .expressions()
+ .iter()
+ .for_each_till_continue(f)
Review Comment:
this for_each_till_continue is an interesting concept
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]