This is an automated email from the ASF dual-hosted git repository.

mete pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 075ff3ddfc Refactors on TreeNode Implementations (#8395)
075ff3ddfc is described below

commit 075ff3ddfc78680d5da424ed63ffea1e38a6c57d
Author: Berkay Şahin <[email protected]>
AuthorDate: Sun Dec 3 01:44:18 2023 +0300

    Refactors on TreeNode Implementations (#8395)
    
    * minor changes
    
    * PipelineStatePropagator tree refactor
    
    * Remove duplications by children_unbounded()
    
    * Remove on-the-fly tree construction
    
    * Minor changes
    
    ---------
    
    Co-authored-by: Mustafa Akur <[email protected]>
---
 .../core/src/physical_optimizer/join_selection.rs  | 21 ++++++--
 .../src/physical_optimizer/pipeline_checker.rs     | 40 +++++++--------
 datafusion/physical-expr/src/equivalence.rs        |  2 +-
 datafusion/physical-expr/src/sort_properties.rs    | 58 ++++++++--------------
 datafusion/physical-expr/src/utils.rs              | 15 +++---
 5 files changed, 65 insertions(+), 71 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs 
b/datafusion/core/src/physical_optimizer/join_selection.rs
index a7ecd1ca65..0c3ac2d245 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -434,7 +434,7 @@ fn hash_join_convert_symmetric_subrule(
     config_options: &ConfigOptions,
 ) -> Option<Result<PipelineStatePropagator>> {
     if let Some(hash_join) = 
input.plan.as_any().downcast_ref::<HashJoinExec>() {
-        let ub_flags = &input.children_unbounded;
+        let ub_flags = input.children_unbounded();
         let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
         input.unbounded = left_unbounded || right_unbounded;
         let result = if left_unbounded && right_unbounded {
@@ -511,7 +511,7 @@ fn hash_join_swap_subrule(
     _config_options: &ConfigOptions,
 ) -> Option<Result<PipelineStatePropagator>> {
     if let Some(hash_join) = 
input.plan.as_any().downcast_ref::<HashJoinExec>() {
-        let ub_flags = &input.children_unbounded;
+        let ub_flags = input.children_unbounded();
         let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
         input.unbounded = left_unbounded || right_unbounded;
         let result = if left_unbounded
@@ -577,7 +577,7 @@ fn apply_subrules(
     }
     let is_unbounded = input
         .plan
-        .unbounded_output(&input.children_unbounded)
+        .unbounded_output(&input.children_unbounded())
         // Treat the case where an operator can not run on unbounded data as
         // if it can and it outputs unbounded data. Do not raise an error yet.
         // Such operators may be fixed, adjusted or replaced later on during
@@ -1253,6 +1253,7 @@ mod hash_join_tests {
     use arrow::record_batch::RecordBatch;
     use datafusion_common::utils::DataPtr;
     use datafusion_common::JoinType;
+    use datafusion_physical_plan::empty::EmptyExec;
     use std::sync::Arc;
 
     struct TestCase {
@@ -1620,10 +1621,22 @@ mod hash_join_tests {
             false,
         )?;
 
+        let children = vec![
+            PipelineStatePropagator {
+                plan: Arc::new(EmptyExec::new(false, 
Arc::new(Schema::empty()))),
+                unbounded: left_unbounded,
+                children: vec![],
+            },
+            PipelineStatePropagator {
+                plan: Arc::new(EmptyExec::new(false, 
Arc::new(Schema::empty()))),
+                unbounded: right_unbounded,
+                children: vec![],
+            },
+        ];
         let initial_hash_join_state = PipelineStatePropagator {
             plan: Arc::new(join),
             unbounded: false,
-            children_unbounded: vec![left_unbounded, right_unbounded],
+            children,
         };
 
         let optimized_hash_join =
diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs 
b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
index 43ae7dbfe7..d59248aadf 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
@@ -70,19 +70,27 @@ impl PhysicalOptimizerRule for PipelineChecker {
 pub struct PipelineStatePropagator {
     pub(crate) plan: Arc<dyn ExecutionPlan>,
     pub(crate) unbounded: bool,
-    pub(crate) children_unbounded: Vec<bool>,
+    pub(crate) children: Vec<PipelineStatePropagator>,
 }
 
 impl PipelineStatePropagator {
     /// Constructs a new, default pipelining state.
     pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
-        let length = plan.children().len();
+        let children = plan.children();
         PipelineStatePropagator {
             plan,
             unbounded: false,
-            children_unbounded: vec![false; length],
+            children: children.into_iter().map(Self::new).collect(),
         }
     }
+
+    /// Returns the children unboundedness information.
+    pub fn children_unbounded(&self) -> Vec<bool> {
+        self.children
+            .iter()
+            .map(|c| c.unbounded)
+            .collect::<Vec<_>>()
+    }
 }
 
 impl TreeNode for PipelineStatePropagator {
@@ -90,9 +98,8 @@ impl TreeNode for PipelineStatePropagator {
     where
         F: FnMut(&Self) -> Result<VisitRecursion>,
     {
-        let children = self.plan.children();
-        for child in children {
-            match op(&PipelineStatePropagator::new(child))? {
+        for child in &self.children {
+            match op(child)? {
                 VisitRecursion::Continue => {}
                 VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
                 VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -106,25 +113,18 @@ impl TreeNode for PipelineStatePropagator {
     where
         F: FnMut(Self) -> Result<Self>,
     {
-        let children = self.plan.children();
-        if !children.is_empty() {
-            let new_children = children
+        if !self.children.is_empty() {
+            let new_children = self
+                .children
                 .into_iter()
-                .map(PipelineStatePropagator::new)
                 .map(transform)
                 .collect::<Result<Vec<_>>>()?;
-            let children_unbounded = new_children
-                .iter()
-                .map(|c| c.unbounded)
-                .collect::<Vec<bool>>();
-            let children_plans = new_children
-                .into_iter()
-                .map(|child| child.plan)
-                .collect::<Vec<_>>();
+            let children_plans = new_children.iter().map(|c| 
c.plan.clone()).collect();
+
             Ok(PipelineStatePropagator {
                 plan: with_new_children_if_necessary(self.plan, 
children_plans)?.into(),
                 unbounded: self.unbounded,
-                children_unbounded,
+                children: new_children,
             })
         } else {
             Ok(self)
@@ -149,7 +149,7 @@ pub fn check_finiteness_requirements(
     }
     input
         .plan
-        .unbounded_output(&input.children_unbounded)
+        .unbounded_output(&input.children_unbounded())
         .map(|value| {
             input.unbounded = value;
             Transformed::Yes(input)
diff --git a/datafusion/physical-expr/src/equivalence.rs 
b/datafusion/physical-expr/src/equivalence.rs
index f9f03300f5..4a562f4ef1 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -1520,7 +1520,7 @@ fn update_ordering(
         node.state = SortProperties::Ordered(options);
     } else if !node.expr.children().is_empty() {
         // We have an intermediate (non-leaf) node, account for its children:
-        node.state = node.expr.get_ordering(&node.children_states);
+        node.state = node.expr.get_ordering(&node.children_state());
     } else if node.expr.as_any().is::<Literal>() {
         // We have a Literal, which is the other possible leaf node type:
         node.state = node.expr.get_ordering(&[]);
diff --git a/datafusion/physical-expr/src/sort_properties.rs 
b/datafusion/physical-expr/src/sort_properties.rs
index f8648abdf7..f513744617 100644
--- a/datafusion/physical-expr/src/sort_properties.rs
+++ b/datafusion/physical-expr/src/sort_properties.rs
@@ -17,13 +17,12 @@
 
 use std::{ops::Neg, sync::Arc};
 
-use crate::PhysicalExpr;
 use arrow_schema::SortOptions;
+
+use crate::PhysicalExpr;
 use datafusion_common::tree_node::{TreeNode, VisitRecursion};
 use datafusion_common::Result;
 
-use itertools::Itertools;
-
 /// To propagate [`SortOptions`] across the [`PhysicalExpr`], it is 
insufficient
 /// to simply use `Option<SortOptions>`: There must be a differentiation 
between
 /// unordered columns and literal values, since literals may not break the 
ordering
@@ -35,11 +34,12 @@ use itertools::Itertools;
 /// sorted data; however the ((a_ordered + 999) + c_ordered) expression can. 
Therefore,
 /// we need two different variants for literals and unordered columns as 
literals are
 /// often more ordering-friendly under most mathematical operations.
-#[derive(PartialEq, Debug, Clone, Copy)]
+#[derive(PartialEq, Debug, Clone, Copy, Default)]
 pub enum SortProperties {
     /// Use the ordinary [`SortOptions`] struct to represent ordered data:
     Ordered(SortOptions),
     // This alternative represents unordered data:
+    #[default]
     Unordered,
     // Singleton is used for single-valued literal numbers:
     Singleton,
@@ -151,34 +151,24 @@ impl Neg for SortProperties {
 pub struct ExprOrdering {
     pub expr: Arc<dyn PhysicalExpr>,
     pub state: SortProperties,
-    pub children_states: Vec<SortProperties>,
+    pub children: Vec<ExprOrdering>,
 }
 
 impl ExprOrdering {
     /// Creates a new [`ExprOrdering`] with [`SortProperties::Unordered`] 
states
     /// for `expr` and its children.
     pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
-        let size = expr.children().len();
+        let children = expr.children();
         Self {
             expr,
-            state: SortProperties::Unordered,
-            children_states: vec![SortProperties::Unordered; size],
+            state: Default::default(),
+            children: children.into_iter().map(Self::new).collect(),
         }
     }
 
-    /// Updates this [`ExprOrdering`]'s children states with the given states.
-    pub fn with_new_children(mut self, children_states: Vec<SortProperties>) 
-> Self {
-        self.children_states = children_states;
-        self
-    }
-
-    /// Creates new [`ExprOrdering`] objects for each child of the expression.
-    pub fn children_expr_orderings(&self) -> Vec<ExprOrdering> {
-        self.expr
-            .children()
-            .into_iter()
-            .map(ExprOrdering::new)
-            .collect()
+    /// Get a reference to each child state.
+    pub fn children_state(&self) -> Vec<SortProperties> {
+        self.children.iter().map(|c| c.state).collect()
     }
 }
 
@@ -187,8 +177,8 @@ impl TreeNode for ExprOrdering {
     where
         F: FnMut(&Self) -> Result<VisitRecursion>,
     {
-        for child in self.children_expr_orderings() {
-            match op(&child)? {
+        for child in &self.children {
+            match op(child)? {
                 VisitRecursion::Continue => {}
                 VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
                 VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -197,25 +187,19 @@ impl TreeNode for ExprOrdering {
         Ok(VisitRecursion::Continue)
     }
 
-    fn map_children<F>(self, transform: F) -> Result<Self>
+    fn map_children<F>(mut self, transform: F) -> Result<Self>
     where
         F: FnMut(Self) -> Result<Self>,
     {
-        if self.children_states.is_empty() {
+        if self.children.is_empty() {
             Ok(self)
         } else {
-            let child_expr_orderings = self.children_expr_orderings();
-            // After mapping over the children, the function `F` applies to the
-            // current object and updates its state.
-            Ok(self.with_new_children(
-                child_expr_orderings
-                    .into_iter()
-                    // Update children states after this transformation:
-                    .map(transform)
-                    // Extract the state (i.e. sort properties) information:
-                    .map_ok(|c| c.state)
-                    .collect::<Result<Vec<_>>>()?,
-            ))
+            self.children = self
+                .children
+                .into_iter()
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+            Ok(self)
         }
     }
 }
diff --git a/datafusion/physical-expr/src/utils.rs 
b/datafusion/physical-expr/src/utils.rs
index ed62956de8..71a7ff5fb7 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -129,10 +129,11 @@ pub struct ExprTreeNode<T> {
 
 impl<T> ExprTreeNode<T> {
     pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
+        let children = expr.children();
         ExprTreeNode {
             expr,
             data: None,
-            child_nodes: vec![],
+            child_nodes: children.into_iter().map(Self::new).collect_vec(),
         }
     }
 
@@ -140,12 +141,8 @@ impl<T> ExprTreeNode<T> {
         &self.expr
     }
 
-    pub fn children(&self) -> Vec<ExprTreeNode<T>> {
-        self.expr
-            .children()
-            .into_iter()
-            .map(ExprTreeNode::new)
-            .collect()
+    pub fn children(&self) -> &[ExprTreeNode<T>] {
+        &self.child_nodes
     }
 }
 
@@ -155,7 +152,7 @@ impl<T: Clone> TreeNode for ExprTreeNode<T> {
         F: FnMut(&Self) -> Result<VisitRecursion>,
     {
         for child in self.children() {
-            match op(&child)? {
+            match op(child)? {
                 VisitRecursion::Continue => {}
                 VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
                 VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -170,7 +167,7 @@ impl<T: Clone> TreeNode for ExprTreeNode<T> {
         F: FnMut(Self) -> Result<Self>,
     {
         self.child_nodes = self
-            .children()
+            .child_nodes
             .into_iter()
             .map(transform)
             .collect::<Result<Vec<_>>>()?;

Reply via email to