This is an automated email from the ASF dual-hosted git repository.
mete pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 075ff3ddfc Refactors on TreeNode Implementations (#8395)
075ff3ddfc is described below
commit 075ff3ddfc78680d5da424ed63ffea1e38a6c57d
Author: Berkay Şahin <[email protected]>
AuthorDate: Sun Dec 3 01:44:18 2023 +0300
Refactors on TreeNode Implementations (#8395)
* minor changes
* PipelineStatePropagator tree refactor
* Remove duplications by children_unbounded()
* Remove on-the-fly tree construction
* Minor changes
---------
Co-authored-by: Mustafa Akur <[email protected]>
---
.../core/src/physical_optimizer/join_selection.rs | 21 ++++++--
.../src/physical_optimizer/pipeline_checker.rs | 40 +++++++--------
datafusion/physical-expr/src/equivalence.rs | 2 +-
datafusion/physical-expr/src/sort_properties.rs | 58 ++++++++--------------
datafusion/physical-expr/src/utils.rs | 15 +++---
5 files changed, 65 insertions(+), 71 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs
b/datafusion/core/src/physical_optimizer/join_selection.rs
index a7ecd1ca65..0c3ac2d245 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -434,7 +434,7 @@ fn hash_join_convert_symmetric_subrule(
config_options: &ConfigOptions,
) -> Option<Result<PipelineStatePropagator>> {
if let Some(hash_join) =
input.plan.as_any().downcast_ref::<HashJoinExec>() {
- let ub_flags = &input.children_unbounded;
+ let ub_flags = input.children_unbounded();
let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
input.unbounded = left_unbounded || right_unbounded;
let result = if left_unbounded && right_unbounded {
@@ -511,7 +511,7 @@ fn hash_join_swap_subrule(
_config_options: &ConfigOptions,
) -> Option<Result<PipelineStatePropagator>> {
if let Some(hash_join) =
input.plan.as_any().downcast_ref::<HashJoinExec>() {
- let ub_flags = &input.children_unbounded;
+ let ub_flags = input.children_unbounded();
let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
input.unbounded = left_unbounded || right_unbounded;
let result = if left_unbounded
@@ -577,7 +577,7 @@ fn apply_subrules(
}
let is_unbounded = input
.plan
- .unbounded_output(&input.children_unbounded)
+ .unbounded_output(&input.children_unbounded())
// Treat the case where an operator can not run on unbounded data as
// if it can and it outputs unbounded data. Do not raise an error yet.
// Such operators may be fixed, adjusted or replaced later on during
@@ -1253,6 +1253,7 @@ mod hash_join_tests {
use arrow::record_batch::RecordBatch;
use datafusion_common::utils::DataPtr;
use datafusion_common::JoinType;
+ use datafusion_physical_plan::empty::EmptyExec;
use std::sync::Arc;
struct TestCase {
@@ -1620,10 +1621,22 @@ mod hash_join_tests {
false,
)?;
+ let children = vec![
+ PipelineStatePropagator {
+ plan: Arc::new(EmptyExec::new(false,
Arc::new(Schema::empty()))),
+ unbounded: left_unbounded,
+ children: vec![],
+ },
+ PipelineStatePropagator {
+ plan: Arc::new(EmptyExec::new(false,
Arc::new(Schema::empty()))),
+ unbounded: right_unbounded,
+ children: vec![],
+ },
+ ];
let initial_hash_join_state = PipelineStatePropagator {
plan: Arc::new(join),
unbounded: false,
- children_unbounded: vec![left_unbounded, right_unbounded],
+ children,
};
let optimized_hash_join =
diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
index 43ae7dbfe7..d59248aadf 100644
--- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs
+++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs
@@ -70,19 +70,27 @@ impl PhysicalOptimizerRule for PipelineChecker {
pub struct PipelineStatePropagator {
pub(crate) plan: Arc<dyn ExecutionPlan>,
pub(crate) unbounded: bool,
- pub(crate) children_unbounded: Vec<bool>,
+ pub(crate) children: Vec<PipelineStatePropagator>,
}
impl PipelineStatePropagator {
/// Constructs a new, default pipelining state.
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
- let length = plan.children().len();
+ let children = plan.children();
PipelineStatePropagator {
plan,
unbounded: false,
- children_unbounded: vec![false; length],
+ children: children.into_iter().map(Self::new).collect(),
}
}
+
+ /// Returns the children unboundedness information.
+ pub fn children_unbounded(&self) -> Vec<bool> {
+ self.children
+ .iter()
+ .map(|c| c.unbounded)
+ .collect::<Vec<_>>()
+ }
}
impl TreeNode for PipelineStatePropagator {
@@ -90,9 +98,8 @@ impl TreeNode for PipelineStatePropagator {
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
- let children = self.plan.children();
- for child in children {
- match op(&PipelineStatePropagator::new(child))? {
+ for child in &self.children {
+ match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -106,25 +113,18 @@ impl TreeNode for PipelineStatePropagator {
where
F: FnMut(Self) -> Result<Self>,
{
- let children = self.plan.children();
- if !children.is_empty() {
- let new_children = children
+ if !self.children.is_empty() {
+ let new_children = self
+ .children
.into_iter()
- .map(PipelineStatePropagator::new)
.map(transform)
.collect::<Result<Vec<_>>>()?;
- let children_unbounded = new_children
- .iter()
- .map(|c| c.unbounded)
- .collect::<Vec<bool>>();
- let children_plans = new_children
- .into_iter()
- .map(|child| child.plan)
- .collect::<Vec<_>>();
+ let children_plans = new_children.iter().map(|c|
c.plan.clone()).collect();
+
Ok(PipelineStatePropagator {
plan: with_new_children_if_necessary(self.plan,
children_plans)?.into(),
unbounded: self.unbounded,
- children_unbounded,
+ children: new_children,
})
} else {
Ok(self)
@@ -149,7 +149,7 @@ pub fn check_finiteness_requirements(
}
input
.plan
- .unbounded_output(&input.children_unbounded)
+ .unbounded_output(&input.children_unbounded())
.map(|value| {
input.unbounded = value;
Transformed::Yes(input)
diff --git a/datafusion/physical-expr/src/equivalence.rs
b/datafusion/physical-expr/src/equivalence.rs
index f9f03300f5..4a562f4ef1 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -1520,7 +1520,7 @@ fn update_ordering(
node.state = SortProperties::Ordered(options);
} else if !node.expr.children().is_empty() {
// We have an intermediate (non-leaf) node, account for its children:
- node.state = node.expr.get_ordering(&node.children_states);
+ node.state = node.expr.get_ordering(&node.children_state());
} else if node.expr.as_any().is::<Literal>() {
// We have a Literal, which is the other possible leaf node type:
node.state = node.expr.get_ordering(&[]);
diff --git a/datafusion/physical-expr/src/sort_properties.rs
b/datafusion/physical-expr/src/sort_properties.rs
index f8648abdf7..f513744617 100644
--- a/datafusion/physical-expr/src/sort_properties.rs
+++ b/datafusion/physical-expr/src/sort_properties.rs
@@ -17,13 +17,12 @@
use std::{ops::Neg, sync::Arc};
-use crate::PhysicalExpr;
use arrow_schema::SortOptions;
+
+use crate::PhysicalExpr;
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::Result;
-use itertools::Itertools;
-
/// To propagate [`SortOptions`] across the [`PhysicalExpr`], it is
insufficient
/// to simply use `Option<SortOptions>`: There must be a differentiation
between
/// unordered columns and literal values, since literals may not break the
ordering
@@ -35,11 +34,12 @@ use itertools::Itertools;
/// sorted data; however the ((a_ordered + 999) + c_ordered) expression can.
Therefore,
/// we need two different variants for literals and unordered columns as
literals are
/// often more ordering-friendly under most mathematical operations.
-#[derive(PartialEq, Debug, Clone, Copy)]
+#[derive(PartialEq, Debug, Clone, Copy, Default)]
pub enum SortProperties {
/// Use the ordinary [`SortOptions`] struct to represent ordered data:
Ordered(SortOptions),
// This alternative represents unordered data:
+ #[default]
Unordered,
// Singleton is used for single-valued literal numbers:
Singleton,
@@ -151,34 +151,24 @@ impl Neg for SortProperties {
pub struct ExprOrdering {
pub expr: Arc<dyn PhysicalExpr>,
pub state: SortProperties,
- pub children_states: Vec<SortProperties>,
+ pub children: Vec<ExprOrdering>,
}
impl ExprOrdering {
/// Creates a new [`ExprOrdering`] with [`SortProperties::Unordered`]
states
/// for `expr` and its children.
pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
- let size = expr.children().len();
+ let children = expr.children();
Self {
expr,
- state: SortProperties::Unordered,
- children_states: vec![SortProperties::Unordered; size],
+ state: Default::default(),
+ children: children.into_iter().map(Self::new).collect(),
}
}
- /// Updates this [`ExprOrdering`]'s children states with the given states.
- pub fn with_new_children(mut self, children_states: Vec<SortProperties>)
-> Self {
- self.children_states = children_states;
- self
- }
-
- /// Creates new [`ExprOrdering`] objects for each child of the expression.
- pub fn children_expr_orderings(&self) -> Vec<ExprOrdering> {
- self.expr
- .children()
- .into_iter()
- .map(ExprOrdering::new)
- .collect()
+ /// Get a reference to each child state.
+ pub fn children_state(&self) -> Vec<SortProperties> {
+ self.children.iter().map(|c| c.state).collect()
}
}
@@ -187,8 +177,8 @@ impl TreeNode for ExprOrdering {
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
- for child in self.children_expr_orderings() {
- match op(&child)? {
+ for child in &self.children {
+ match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -197,25 +187,19 @@ impl TreeNode for ExprOrdering {
Ok(VisitRecursion::Continue)
}
- fn map_children<F>(self, transform: F) -> Result<Self>
+ fn map_children<F>(mut self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
- if self.children_states.is_empty() {
+ if self.children.is_empty() {
Ok(self)
} else {
- let child_expr_orderings = self.children_expr_orderings();
- // After mapping over the children, the function `F` applies to the
- // current object and updates its state.
- Ok(self.with_new_children(
- child_expr_orderings
- .into_iter()
- // Update children states after this transformation:
- .map(transform)
- // Extract the state (i.e. sort properties) information:
- .map_ok(|c| c.state)
- .collect::<Result<Vec<_>>>()?,
- ))
+ self.children = self
+ .children
+ .into_iter()
+ .map(transform)
+ .collect::<Result<Vec<_>>>()?;
+ Ok(self)
}
}
}
diff --git a/datafusion/physical-expr/src/utils.rs
b/datafusion/physical-expr/src/utils.rs
index ed62956de8..71a7ff5fb7 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -129,10 +129,11 @@ pub struct ExprTreeNode<T> {
impl<T> ExprTreeNode<T> {
pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
+ let children = expr.children();
ExprTreeNode {
expr,
data: None,
- child_nodes: vec![],
+ child_nodes: children.into_iter().map(Self::new).collect_vec(),
}
}
@@ -140,12 +141,8 @@ impl<T> ExprTreeNode<T> {
&self.expr
}
- pub fn children(&self) -> Vec<ExprTreeNode<T>> {
- self.expr
- .children()
- .into_iter()
- .map(ExprTreeNode::new)
- .collect()
+ pub fn children(&self) -> &[ExprTreeNode<T>] {
+ &self.child_nodes
}
}
@@ -155,7 +152,7 @@ impl<T: Clone> TreeNode for ExprTreeNode<T> {
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children() {
- match op(&child)? {
+ match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
@@ -170,7 +167,7 @@ impl<T: Clone> TreeNode for ExprTreeNode<T> {
F: FnMut(Self) -> Result<Self>,
{
self.child_nodes = self
- .children()
+ .child_nodes
.into_iter()
.map(transform)
.collect::<Result<Vec<_>>>()?;