This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 95ba48bd22 Better Equivalence (ordering and exact equivalence) 
Propagation through ProjectionExec (#8484)
95ba48bd22 is described below

commit 95ba48bd2291dd5c303bdaf88cbb55c79d395930
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Dec 11 20:05:55 2023 +0300

    Better Equivalence (ordering and exact equivalence) Propagation through 
ProjectionExec (#8484)
    
    
    * Better projection support complex expression support
    
    
    ---------
    
    Co-authored-by: metesynnada <[email protected]>
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 datafusion/physical-expr/src/equivalence.rs | 1746 ++++++++++++++++++++++++++-
 1 file changed, 1679 insertions(+), 67 deletions(-)

diff --git a/datafusion/physical-expr/src/equivalence.rs 
b/datafusion/physical-expr/src/equivalence.rs
index 4a562f4ef1..defd7b5786 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -15,7 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::hash::Hash;
+use std::collections::{HashMap, HashSet};
+use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 use crate::expressions::{Column, Literal};
@@ -26,12 +27,14 @@ use crate::{
     LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalSortExpr,
     PhysicalSortRequirement,
 };
+
 use arrow::datatypes::SchemaRef;
 use arrow_schema::SortOptions;
 use datafusion_common::tree_node::{Transformed, TreeNode};
 use datafusion_common::{JoinSide, JoinType, Result};
 
 use indexmap::IndexSet;
+use itertools::Itertools;
 
 /// An `EquivalenceClass` is a set of [`Arc<dyn PhysicalExpr>`]s that are known
 /// to have the same value for all tuples in a relation. These are generated by
@@ -465,31 +468,6 @@ impl EquivalenceGroup {
             .map(|children| expr.clone().with_new_children(children).unwrap())
     }
 
-    /// Projects `ordering` according to the given projection mapping.
-    /// If the resulting ordering is invalid after projection, returns `None`.
-    fn project_ordering(
-        &self,
-        mapping: &ProjectionMapping,
-        ordering: LexOrderingRef,
-    ) -> Option<LexOrdering> {
-        // If any sort expression is invalid after projection, rest of the
-        // ordering shouldn't be projected either. For example, if input 
ordering
-        // is [a ASC, b ASC, c ASC], and column b is not valid after 
projection,
-        // the result should be [a ASC], not [a ASC, c ASC], even if column c 
is
-        // valid after projection.
-        let result = ordering
-            .iter()
-            .map_while(|sort_expr| {
-                self.project_expr(mapping, &sort_expr.expr)
-                    .map(|expr| PhysicalSortExpr {
-                        expr,
-                        options: sort_expr.options,
-                    })
-            })
-            .collect::<Vec<_>>();
-        (!result.is_empty()).then_some(result)
-    }
-
     /// Projects this equivalence group according to the given projection 
mapping.
     pub fn project(&self, mapping: &ProjectionMapping) -> Self {
         let projected_classes = self.iter().filter_map(|cls| {
@@ -724,8 +702,21 @@ impl OrderingEquivalenceClass {
     // Append orderings in `other` to all existing orderings in this 
equivalence
     // class.
     pub fn join_suffix(mut self, other: &Self) -> Self {
-        for ordering in other.iter() {
-            for idx in 0..self.orderings.len() {
+        let n_ordering = self.orderings.len();
+        // Replicate entries before cross product
+        let n_cross = std::cmp::max(n_ordering, other.len() * n_ordering);
+        self.orderings = self
+            .orderings
+            .iter()
+            .cloned()
+            .cycle()
+            .take(n_cross)
+            .collect();
+        // Suffix orderings of other to the current orderings.
+        for (outer_idx, ordering) in other.iter().enumerate() {
+            for idx in 0..n_ordering {
+                // Calculate cross product index
+                let idx = outer_idx * n_ordering + idx;
                 self.orderings[idx].extend(ordering.iter().cloned());
             }
         }
@@ -1196,6 +1187,181 @@ impl EquivalenceProperties {
         self.eq_group.project_expr(projection_mapping, expr)
     }
 
+    /// Constructs a dependency map based on existing orderings referred to in
+    /// the projection.
+    ///
+    /// This function analyzes the orderings in the normalized 
order-equivalence
+    /// class and builds a dependency map. The dependency map captures 
relationships
+    /// between expressions within the orderings, helping to identify 
dependencies
+    /// and construct valid projected orderings during projection operations.
+    ///
+    /// # Parameters
+    ///
+    /// - `mapping`: A reference to the `ProjectionMapping` that defines the
+    ///   relationship between source and target expressions.
+    ///
+    /// # Returns
+    ///
+    /// A [`DependencyMap`] representing the dependency map, where each
+    /// [`DependencyNode`] contains dependencies for the key 
[`PhysicalSortExpr`].
+    ///
+    /// # Example
+    ///
+    /// Assume we have two equivalent orderings: `[a ASC, b ASC]` and `[a ASC, 
c ASC]`,
+    /// and the projection mapping is `[a -> a_new, b -> b_new, b + c -> b + 
c]`.
+    /// Then, the dependency map will be:
+    ///
+    /// ```text
+    /// a ASC: Node {Some(a_new ASC), HashSet{}}
+    /// b ASC: Node {Some(b_new ASC), HashSet{a ASC}}
+    /// c ASC: Node {None, HashSet{a ASC}}
+    /// ```
+    fn construct_dependency_map(&self, mapping: &ProjectionMapping) -> 
DependencyMap {
+        let mut dependency_map = HashMap::new();
+        for ordering in self.normalized_oeq_class().iter() {
+            for (idx, sort_expr) in ordering.iter().enumerate() {
+                let target_sort_expr =
+                    self.project_expr(&sort_expr.expr, mapping).map(|expr| {
+                        PhysicalSortExpr {
+                            expr,
+                            options: sort_expr.options,
+                        }
+                    });
+                let is_projected = target_sort_expr.is_some();
+                if is_projected
+                    || mapping
+                        .iter()
+                        .any(|(source, _)| expr_refers(source, 
&sort_expr.expr))
+                {
+                    // Previous ordering is a dependency. Note that there is 
no,
+                    // dependency for a leading ordering (i.e. the first sort
+                    // expression).
+                    let dependency = idx.checked_sub(1).map(|a| &ordering[a]);
+                    // Add sort expressions that can be projected or referred 
to
+                    // by any of the projection expressions to the dependency 
map:
+                    dependency_map
+                        .entry(sort_expr.clone())
+                        .or_insert_with(|| DependencyNode {
+                            target_sort_expr: target_sort_expr.clone(),
+                            dependencies: HashSet::new(),
+                        })
+                        .insert_dependency(dependency);
+                }
+                if !is_projected {
+                    // If we can not project, stop constructing the dependency
+                    // map as remaining dependencies will be invalid after 
projection.
+                    break;
+                }
+            }
+        }
+        dependency_map
+    }
+
+    /// Returns a new `ProjectionMapping` where source expressions are 
normalized.
+    ///
+    /// This normalization ensures that source expressions are transformed 
into a
+    /// consistent representation. This is beneficial for algorithms that rely 
on
+    /// exact equalities, as it allows for more precise and reliable 
comparisons.
+    ///
+    /// # Parameters
+    ///
+    /// - `mapping`: A reference to the original `ProjectionMapping` to be 
normalized.
+    ///
+    /// # Returns
+    ///
+    /// A new `ProjectionMapping` with normalized source expressions.
+    fn normalized_mapping(&self, mapping: &ProjectionMapping) -> 
ProjectionMapping {
+        // Construct the mapping where source expressions are normalized. In 
this way
+        // In the algorithms below we can work on exact equalities
+        ProjectionMapping {
+            map: mapping
+                .iter()
+                .map(|(source, target)| {
+                    let normalized_source = 
self.eq_group.normalize_expr(source.clone());
+                    (normalized_source, target.clone())
+                })
+                .collect(),
+        }
+    }
+
+    /// Computes projected orderings based on a given projection mapping.
+    ///
+    /// This function takes a `ProjectionMapping` and computes the possible
+    /// orderings for the projected expressions. It considers dependencies
+    /// between expressions and generates valid orderings according to the
+    /// specified sort properties.
+    ///
+    /// # Parameters
+    ///
+    /// - `mapping`: A reference to the `ProjectionMapping` that defines the
+    ///   relationship between source and target expressions.
+    ///
+    /// # Returns
+    ///
+    /// A vector of `LexOrdering` containing all valid orderings after 
projection.
+    fn projected_orderings(&self, mapping: &ProjectionMapping) -> 
Vec<LexOrdering> {
+        let mapping = self.normalized_mapping(mapping);
+
+        // Get dependency map for existing orderings:
+        let dependency_map = self.construct_dependency_map(&mapping);
+
+        let orderings = mapping.iter().flat_map(|(source, target)| {
+            referred_dependencies(&dependency_map, source)
+                .into_iter()
+                .filter_map(|relevant_deps| {
+                    if let SortProperties::Ordered(options) =
+                        get_expr_ordering(source, &relevant_deps)
+                    {
+                        Some((options, relevant_deps))
+                    } else {
+                        // Do not consider unordered cases
+                        None
+                    }
+                })
+                .flat_map(|(options, relevant_deps)| {
+                    let sort_expr = PhysicalSortExpr {
+                        expr: target.clone(),
+                        options,
+                    };
+                    // Generate dependent orderings (i.e. prefixes for 
`sort_expr`):
+                    let mut dependency_orderings =
+                        generate_dependency_orderings(&relevant_deps, 
&dependency_map);
+                    // Append `sort_expr` to the dependent orderings:
+                    for ordering in dependency_orderings.iter_mut() {
+                        ordering.push(sort_expr.clone());
+                    }
+                    dependency_orderings
+                })
+        });
+
+        // Add valid projected orderings. For example, if existing ordering is
+        // `a + b` and projection is `[a -> a_new, b -> b_new]`, we need to
+        // preserve `a_new + b_new` as ordered. Please note that `a_new` and
+        // `b_new` themselves need not be ordered. Such dependencies cannot be
+        // deduced via the pass above.
+        let projected_orderings = dependency_map.iter().flat_map(|(sort_expr, 
node)| {
+            let mut prefixes = construct_prefix_orderings(sort_expr, 
&dependency_map);
+            if prefixes.is_empty() {
+                // If prefix is empty, there is no dependency. Insert
+                // empty ordering:
+                prefixes = vec![vec![]];
+            }
+            // Append current ordering on top its dependencies:
+            for ordering in prefixes.iter_mut() {
+                if let Some(target) = &node.target_sort_expr {
+                    ordering.push(target.clone())
+                }
+            }
+            prefixes
+        });
+
+        // Simplify each ordering by removing redundant sections:
+        orderings
+            .chain(projected_orderings)
+            .map(collapse_lex_ordering)
+            .collect()
+    }
+
     /// Projects constants based on the provided `ProjectionMapping`.
     ///
     /// This function takes a `ProjectionMapping` and identifies/projects
@@ -1240,28 +1406,13 @@ impl EquivalenceProperties {
         projection_mapping: &ProjectionMapping,
         output_schema: SchemaRef,
     ) -> Self {
-        let mut projected_orderings = self
-            .oeq_class
-            .iter()
-            .filter_map(|order| 
self.eq_group.project_ordering(projection_mapping, order))
-            .collect::<Vec<_>>();
-        for (source, target) in projection_mapping.iter() {
-            let expr_ordering = ExprOrdering::new(source.clone())
-                .transform_up(&|expr| Ok(update_ordering(expr, self)))
-                // Guaranteed to always return `Ok`.
-                .unwrap();
-            if let SortProperties::Ordered(options) = expr_ordering.state {
-                // Push new ordering to the state.
-                projected_orderings.push(vec![PhysicalSortExpr {
-                    expr: target.clone(),
-                    options,
-                }]);
-            }
-        }
+        let projected_constants = self.projected_constants(projection_mapping);
+        let projected_eq_group = self.eq_group.project(projection_mapping);
+        let projected_orderings = self.projected_orderings(projection_mapping);
         Self {
-            eq_group: self.eq_group.project(projection_mapping),
+            eq_group: projected_eq_group,
             oeq_class: OrderingEquivalenceClass::new(projected_orderings),
-            constants: self.projected_constants(projection_mapping),
+            constants: projected_constants,
             schema: output_schema,
         }
     }
@@ -1397,6 +1548,270 @@ fn is_constant_recurse(
     !children.is_empty() && children.iter().all(|c| 
is_constant_recurse(constants, c))
 }
 
+/// This function examines whether a referring expression directly refers to a
+/// given referred expression or if any of its children in the expression tree
+/// refer to the specified expression.
+///
+/// # Parameters
+///
+/// - `referring_expr`: A reference to the referring expression (`Arc<dyn 
PhysicalExpr>`).
+/// - `referred_expr`: A reference to the referred expression (`Arc<dyn 
PhysicalExpr>`)
+///
+/// # Returns
+///
+/// A boolean value indicating whether `referring_expr` refers (needs it to 
evaluate its result)
+/// `referred_expr` or not.
+fn expr_refers(
+    referring_expr: &Arc<dyn PhysicalExpr>,
+    referred_expr: &Arc<dyn PhysicalExpr>,
+) -> bool {
+    referring_expr.eq(referred_expr)
+        || referring_expr
+            .children()
+            .iter()
+            .any(|child| expr_refers(child, referred_expr))
+}
+
+/// Wrapper struct for `Arc<dyn PhysicalExpr>` to use them as keys in a hash 
map.
+#[derive(Debug, Clone)]
+struct ExprWrapper(Arc<dyn PhysicalExpr>);
+
+impl PartialEq<Self> for ExprWrapper {
+    fn eq(&self, other: &Self) -> bool {
+        self.0.eq(&other.0)
+    }
+}
+
+impl Eq for ExprWrapper {}
+
+impl Hash for ExprWrapper {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        self.0.hash(state);
+    }
+}
+
+/// This function analyzes the dependency map to collect referred dependencies 
for
+/// a given source expression.
+///
+/// # Parameters
+///
+/// - `dependency_map`: A reference to the `DependencyMap` where each
+///   `PhysicalSortExpr` is associated with a `DependencyNode`.
+/// - `source`: A reference to the source expression (`Arc<dyn PhysicalExpr>`)
+///   for which relevant dependencies need to be identified.
+///
+/// # Returns
+///
+/// A `Vec<Dependencies>` containing the dependencies for the given source
+/// expression. These dependencies are expressions that are referred to by
+/// the source expression based on the provided dependency map.
+fn referred_dependencies(
+    dependency_map: &DependencyMap,
+    source: &Arc<dyn PhysicalExpr>,
+) -> Vec<Dependencies> {
+    // Associate `PhysicalExpr`s with `PhysicalSortExpr`s that contain them:
+    let mut expr_to_sort_exprs = HashMap::<ExprWrapper, Dependencies>::new();
+    for sort_expr in dependency_map
+        .keys()
+        .filter(|sort_expr| expr_refers(source, &sort_expr.expr))
+    {
+        let key = ExprWrapper(sort_expr.expr.clone());
+        expr_to_sort_exprs
+            .entry(key)
+            .or_default()
+            .insert(sort_expr.clone());
+    }
+
+    // Generate all valid dependencies for the source. For example, if the 
source
+    // is `a + b` and the map is `[a -> (a ASC, a DESC), b -> (b ASC)]`, we get
+    // `vec![HashSet(a ASC, b ASC), HashSet(a DESC, b ASC)]`.
+    expr_to_sort_exprs
+        .values()
+        .multi_cartesian_product()
+        .map(|referred_deps| referred_deps.into_iter().cloned().collect())
+        .collect()
+}
+
+/// This function recursively analyzes the dependencies of the given sort
+/// expression within the given dependency map to construct lexicographical
+/// orderings that include the sort expression and its dependencies.
+///
+/// # Parameters
+///
+/// - `referred_sort_expr`: A reference to the sort expression 
(`PhysicalSortExpr`)
+///   for which lexicographical orderings satisfying its dependencies are to be
+///   constructed.
+/// - `dependency_map`: A reference to the `DependencyMap` that contains
+///   dependencies for different `PhysicalSortExpr`s.
+///
+/// # Returns
+///
+/// A vector of lexicographical orderings (`Vec<LexOrdering>`) based on the 
given
+/// sort expression and its dependencies.
+fn construct_orderings(
+    referred_sort_expr: &PhysicalSortExpr,
+    dependency_map: &DependencyMap,
+) -> Vec<LexOrdering> {
+    // We are sure that `referred_sort_expr` is inside `dependency_map`.
+    let node = &dependency_map[referred_sort_expr];
+    // Since we work on intermediate nodes, we are sure `val.target_sort_expr`
+    // exists.
+    let target_sort_expr = node.target_sort_expr.clone().unwrap();
+    if node.dependencies.is_empty() {
+        vec![vec![target_sort_expr]]
+    } else {
+        node.dependencies
+            .iter()
+            .flat_map(|dep| {
+                let mut orderings = construct_orderings(dep, dependency_map);
+                for ordering in orderings.iter_mut() {
+                    ordering.push(target_sort_expr.clone())
+                }
+                orderings
+            })
+            .collect()
+    }
+}
+
+/// This function retrieves the dependencies of the given relevant sort 
expression
+/// from the given dependency map. It then constructs prefix orderings by 
recursively
+/// analyzing the dependencies and include them in the orderings.
+///
+/// # Parameters
+///
+/// - `relevant_sort_expr`: A reference to the relevant sort expression
+///   (`PhysicalSortExpr`) for which prefix orderings are to be constructed.
+/// - `dependency_map`: A reference to the `DependencyMap` containing 
dependencies.
+///
+/// # Returns
+///
+/// A vector of prefix orderings (`Vec<LexOrdering>`) based on the given 
relevant
+/// sort expression and its dependencies.
+fn construct_prefix_orderings(
+    relevant_sort_expr: &PhysicalSortExpr,
+    dependency_map: &DependencyMap,
+) -> Vec<LexOrdering> {
+    dependency_map[relevant_sort_expr]
+        .dependencies
+        .iter()
+        .flat_map(|dep| construct_orderings(dep, dependency_map))
+        .collect()
+}
+
+/// Given a set of relevant dependencies (`relevant_deps`) and a map of 
dependencies
+/// (`dependency_map`), this function generates all possible prefix orderings
+/// based on the given dependencies.
+///
+/// # Parameters
+///
+/// * `dependencies` - A reference to the dependencies.
+/// * `dependency_map` - A reference to the map of dependencies for 
expressions.
+///
+/// # Returns
+///
+/// A vector of lexical orderings (`Vec<LexOrdering>`) representing all valid 
orderings
+/// based on the given dependencies.
+fn generate_dependency_orderings(
+    dependencies: &Dependencies,
+    dependency_map: &DependencyMap,
+) -> Vec<LexOrdering> {
+    // Construct all the valid prefix orderings for each expression appearing
+    // in the projection:
+    let relevant_prefixes = dependencies
+        .iter()
+        .flat_map(|dep| {
+            let prefixes = construct_prefix_orderings(dep, dependency_map);
+            (!prefixes.is_empty()).then_some(prefixes)
+        })
+        .collect::<Vec<_>>();
+
+    // No dependency, dependent is a leading ordering.
+    if relevant_prefixes.is_empty() {
+        // Return an empty ordering:
+        return vec![vec![]];
+    }
+
+    // Generate all possible orderings where dependencies are satisfied for the
+    // current projection expression. For example, if expression is `a + b 
ASC`,
+    // and the dependency for `a ASC` is `[c ASC]`, the dependency for `b ASC`
+    // is `[d DESC]`, then we generate `[c ASC, d DESC, a + b ASC]` and
+    // `[d DESC, c ASC, a + b ASC]`.
+    relevant_prefixes
+        .into_iter()
+        .multi_cartesian_product()
+        .flat_map(|prefix_orderings| {
+            prefix_orderings
+                .iter()
+                .permutations(prefix_orderings.len())
+                .map(|prefixes| 
prefixes.into_iter().flatten().cloned().collect())
+                .collect::<Vec<_>>()
+        })
+        .collect()
+}
+
+/// This function examines the given expression and the sort expressions it
+/// refers to determine the ordering properties of the expression.
+///
+/// # Parameters
+///
+/// - `expr`: A reference to the source expression (`Arc<dyn PhysicalExpr>`) 
for
+///   which ordering properties need to be determined.
+/// - `dependencies`: A reference to `Dependencies`, containing sort 
expressions
+///   referred to by `expr`.
+///
+/// # Returns
+///
+/// A `SortProperties` indicating the ordering information of the given 
expression.
+fn get_expr_ordering(
+    expr: &Arc<dyn PhysicalExpr>,
+    dependencies: &Dependencies,
+) -> SortProperties {
+    if let Some(column_order) = dependencies.iter().find(|&order| 
expr.eq(&order.expr)) {
+        // If exact match is found, return its ordering.
+        SortProperties::Ordered(column_order.options)
+    } else {
+        // Find orderings of its children
+        let child_states = expr
+            .children()
+            .iter()
+            .map(|child| get_expr_ordering(child, dependencies))
+            .collect::<Vec<_>>();
+        // Calculate expression ordering using ordering of its children.
+        expr.get_ordering(&child_states)
+    }
+}
+
+/// Represents a node in the dependency map used to construct projected 
orderings.
+///
+/// A `DependencyNode` contains information about a particular sort expression,
+/// including its target sort expression and a set of dependencies on other 
sort
+/// expressions.
+///
+/// # Fields
+///
+/// - `target_sort_expr`: An optional `PhysicalSortExpr` representing the 
target
+///   sort expression associated with the node. It is `None` if the sort 
expression
+///   cannot be projected.
+/// - `dependencies`: A [`Dependencies`] containing dependencies on other sort
+///   expressions that are referred to by the target sort expression.
+#[derive(Debug, Clone, PartialEq, Eq)]
+struct DependencyNode {
+    target_sort_expr: Option<PhysicalSortExpr>,
+    dependencies: Dependencies,
+}
+
+impl DependencyNode {
+    // Insert dependency to the state (if exists).
+    fn insert_dependency(&mut self, dependency: Option<&PhysicalSortExpr>) {
+        if let Some(dep) = dependency {
+            self.dependencies.insert(dep.clone());
+        }
+    }
+}
+
+type DependencyMap = HashMap<PhysicalSortExpr, DependencyNode>;
+type Dependencies = HashSet<PhysicalSortExpr>;
+
 /// Calculate ordering equivalence properties for the given join operation.
 pub fn join_equivalence_properties(
     left: EquivalenceProperties,
@@ -1544,7 +1959,7 @@ mod tests {
     use arrow::datatypes::{DataType, Field, Schema};
     use arrow_array::{ArrayRef, Float64Array, RecordBatch, UInt32Array};
     use arrow_schema::{Fields, SortOptions, TimeUnit};
-    use datafusion_common::{Result, ScalarValue};
+    use datafusion_common::{plan_datafusion_err, DataFusionError, Result, 
ScalarValue};
     use datafusion_expr::{BuiltinScalarFunction, Operator};
 
     use itertools::{izip, Itertools};
@@ -1552,6 +1967,37 @@ mod tests {
     use rand::seq::SliceRandom;
     use rand::{Rng, SeedableRng};
 
+    fn output_schema(
+        mapping: &ProjectionMapping,
+        input_schema: &Arc<Schema>,
+    ) -> Result<SchemaRef> {
+        // Calculate output schema
+        let fields: Result<Vec<Field>> = mapping
+            .iter()
+            .map(|(source, target)| {
+                let name = target
+                    .as_any()
+                    .downcast_ref::<Column>()
+                    .ok_or_else(|| plan_datafusion_err!("Expects to have 
column"))?
+                    .name();
+                let field = Field::new(
+                    name,
+                    source.data_type(input_schema)?,
+                    source.nullable(input_schema)?,
+                );
+
+                Ok(field)
+            })
+            .collect();
+
+        let output_schema = Arc::new(Schema::new_with_metadata(
+            fields?,
+            input_schema.metadata().clone(),
+        ));
+
+        Ok(output_schema)
+    }
+
     // Generate a schema which consists of 8 columns (a, b, c, d, e, f, g, h)
     fn create_test_schema() -> Result<SchemaRef> {
         let a = Field::new("a", DataType::Int32, true);
@@ -1679,7 +2125,7 @@ mod tests {
             .map(|(expr, options)| {
                 PhysicalSortRequirement::new((*expr).clone(), *options)
             })
-            .collect::<Vec<_>>()
+            .collect()
     }
 
     // Convert each tuple to PhysicalSortExpr
@@ -1692,7 +2138,7 @@ mod tests {
                 expr: (*expr).clone(),
                 options: *options,
             })
-            .collect::<Vec<_>>()
+            .collect()
     }
 
     // Convert each inner tuple to PhysicalSortExpr
@@ -1705,6 +2151,56 @@ mod tests {
             .collect()
     }
 
+    // Convert each tuple to PhysicalSortExpr
+    fn convert_to_sort_exprs_owned(
+        in_data: &[(Arc<dyn PhysicalExpr>, SortOptions)],
+    ) -> Vec<PhysicalSortExpr> {
+        in_data
+            .iter()
+            .map(|(expr, options)| PhysicalSortExpr {
+                expr: (*expr).clone(),
+                options: *options,
+            })
+            .collect()
+    }
+
+    // Convert each inner tuple to PhysicalSortExpr
+    fn convert_to_orderings_owned(
+        orderings: &[Vec<(Arc<dyn PhysicalExpr>, SortOptions)>],
+    ) -> Vec<Vec<PhysicalSortExpr>> {
+        orderings
+            .iter()
+            .map(|sort_exprs| convert_to_sort_exprs_owned(sort_exprs))
+            .collect()
+    }
+
+    // Apply projection to the input_data, return projected equivalence 
properties and record batch
+    fn apply_projection(
+        proj_exprs: Vec<(Arc<dyn PhysicalExpr>, String)>,
+        input_data: &RecordBatch,
+        input_eq_properties: &EquivalenceProperties,
+    ) -> Result<(RecordBatch, EquivalenceProperties)> {
+        let input_schema = input_data.schema();
+        let projection_mapping = ProjectionMapping::try_new(&proj_exprs, 
&input_schema)?;
+
+        let output_schema = output_schema(&projection_mapping, &input_schema)?;
+        let num_rows = input_data.num_rows();
+        // Apply projection to the input record batch.
+        let projected_values = projection_mapping
+            .iter()
+            .map(|(source, _target)| 
source.evaluate(input_data)?.into_array(num_rows))
+            .collect::<Result<Vec<_>>>()?;
+        let projected_batch = if projected_values.is_empty() {
+            RecordBatch::new_empty(output_schema.clone())
+        } else {
+            RecordBatch::try_new(output_schema.clone(), projected_values)?
+        };
+
+        let projected_eq =
+            input_eq_properties.project(&projection_mapping, output_schema);
+        Ok((projected_batch, projected_eq))
+    }
+
     #[test]
     fn add_equal_conditions_test() -> Result<()> {
         let schema = Arc::new(Schema::new(vec![
@@ -1774,13 +2270,16 @@ mod tests {
         let input_properties = 
EquivalenceProperties::new(input_schema.clone());
         let col_a = col("a", &input_schema)?;
 
-        let out_schema = Arc::new(Schema::new(vec![
-            Field::new("a1", DataType::Int64, true),
-            Field::new("a2", DataType::Int64, true),
-            Field::new("a3", DataType::Int64, true),
-            Field::new("a4", DataType::Int64, true),
-        ]));
+        // a as a1, a as a2, a as a3, a as a3
+        let proj_exprs = vec![
+            (col_a.clone(), "a1".to_string()),
+            (col_a.clone(), "a2".to_string()),
+            (col_a.clone(), "a3".to_string()),
+            (col_a.clone(), "a4".to_string()),
+        ];
+        let projection_mapping = ProjectionMapping::try_new(&proj_exprs, 
&input_schema)?;
 
+        let out_schema = output_schema(&projection_mapping, &input_schema)?;
         // a as a1, a as a2, a as a3, a as a3
         let proj_exprs = vec![
             (col_a.clone(), "a1".to_string()),
@@ -3686,30 +4185,1143 @@ mod tests {
     }
 
     #[test]
-    fn test_expr_consists_of_constants() -> Result<()> {
+    fn project_orderings() -> Result<()> {
         let schema = Arc::new(Schema::new(vec![
             Field::new("a", DataType::Int32, true),
             Field::new("b", DataType::Int32, true),
             Field::new("c", DataType::Int32, true),
             Field::new("d", DataType::Int32, true),
+            Field::new("e", DataType::Int32, true),
             Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), 
true),
         ]));
-        let col_a = col("a", &schema)?;
-        let col_b = col("b", &schema)?;
-        let col_d = col("d", &schema)?;
+        let col_a = &col("a", &schema)?;
+        let col_b = &col("b", &schema)?;
+        let col_c = &col("c", &schema)?;
+        let col_d = &col("d", &schema)?;
+        let col_e = &col("e", &schema)?;
+        let col_ts = &col("ts", &schema)?;
+        let interval = 
Arc::new(Literal::new(ScalarValue::IntervalDayTime(Some(2))))
+            as Arc<dyn PhysicalExpr>;
+        let date_bin_func = &create_physical_expr(
+            &BuiltinScalarFunction::DateBin,
+            &[interval, col_ts.clone()],
+            &schema,
+            &ExecutionProps::default(),
+        )?;
+        let a_plus_b = Arc::new(BinaryExpr::new(
+            col_a.clone(),
+            Operator::Plus,
+            col_b.clone(),
+        )) as Arc<dyn PhysicalExpr>;
         let b_plus_d = Arc::new(BinaryExpr::new(
             col_b.clone(),
             Operator::Plus,
             col_d.clone(),
         )) as Arc<dyn PhysicalExpr>;
+        let b_plus_e = Arc::new(BinaryExpr::new(
+            col_b.clone(),
+            Operator::Plus,
+            col_e.clone(),
+        )) as Arc<dyn PhysicalExpr>;
+        let c_plus_d = Arc::new(BinaryExpr::new(
+            col_c.clone(),
+            Operator::Plus,
+            col_d.clone(),
+        )) as Arc<dyn PhysicalExpr>;
 
-        let constants = vec![col_a.clone(), col_b.clone()];
-        let expr = b_plus_d.clone();
-        assert!(!is_constant_recurse(&constants, &expr));
+        let option_asc = SortOptions {
+            descending: false,
+            nulls_first: false,
+        };
+        let option_desc = SortOptions {
+            descending: true,
+            nulls_first: true,
+        };
 
-        let constants = vec![col_a.clone(), col_b.clone(), col_d.clone()];
-        let expr = b_plus_d.clone();
-        assert!(is_constant_recurse(&constants, &expr));
+        let test_cases = vec![
+            // ---------- TEST CASE 1 ------------
+            (
+                // orderings
+                vec![
+                    // [b ASC]
+                    vec![(col_b, option_asc)],
+                ],
+                // projection exprs
+                vec![(col_b, "b_new".to_string()), (col_a, 
"a_new".to_string())],
+                // expected
+                vec![
+                    // [b_new ASC]
+                    vec![("b_new", option_asc)],
+                ],
+            ),
+            // ---------- TEST CASE 2 ------------
+            (
+                // orderings
+                vec![
+                    // empty ordering
+                ],
+                // projection exprs
+                vec![(col_c, "c_new".to_string()), (col_b, 
"b_new".to_string())],
+                // expected
+                vec![
+                    // no ordering at the output
+                ],
+            ),
+            // ---------- TEST CASE 3 ------------
+            (
+                // orderings
+                vec![
+                    // [ts ASC]
+                    vec![(col_ts, option_asc)],
+                ],
+                // projection exprs
+                vec![
+                    (col_b, "b_new".to_string()),
+                    (col_a, "a_new".to_string()),
+                    (col_ts, "ts_new".to_string()),
+                    (date_bin_func, "date_bin_res".to_string()),
+                ],
+                // expected
+                vec![
+                    // [date_bin_res ASC]
+                    vec![("date_bin_res", option_asc)],
+                    // [ts_new ASC]
+                    vec![("ts_new", option_asc)],
+                ],
+            ),
+            // ---------- TEST CASE 4 ------------
+            (
+                // orderings
+                vec![
+                    // [a ASC, ts ASC]
+                    vec![(col_a, option_asc), (col_ts, option_asc)],
+                    // [b ASC, ts ASC]
+                    vec![(col_b, option_asc), (col_ts, option_asc)],
+                ],
+                // projection exprs
+                vec![
+                    (col_b, "b_new".to_string()),
+                    (col_a, "a_new".to_string()),
+                    (col_ts, "ts_new".to_string()),
+                    (date_bin_func, "date_bin_res".to_string()),
+                ],
+                // expected
+                vec![
+                    // [a_new ASC, ts_new ASC]
+                    vec![("a_new", option_asc), ("ts_new", option_asc)],
+                    // [a_new ASC, date_bin_res ASC]
+                    vec![("a_new", option_asc), ("date_bin_res", option_asc)],
+                    // [b_new ASC, ts_new ASC]
+                    vec![("b_new", option_asc), ("ts_new", option_asc)],
+                    // [b_new ASC, date_bin_res ASC]
+                    vec![("b_new", option_asc), ("date_bin_res", option_asc)],
+                ],
+            ),
+            // ---------- TEST CASE 5 ------------
+            (
+                // orderings
+                vec![
+                    // [a + b ASC]
+                    vec![(&a_plus_b, option_asc)],
+                ],
+                // projection exprs
+                vec![
+                    (col_b, "b_new".to_string()),
+                    (col_a, "a_new".to_string()),
+                    (&a_plus_b, "a+b".to_string()),
+                ],
+                // expected
+                vec![
+                    // [a + b ASC]
+                    vec![("a+b", option_asc)],
+                ],
+            ),
+            // ---------- TEST CASE 6 ------------
+            (
+                // orderings
+                vec![
+                    // [a + b ASC, c ASC]
+                    vec![(&a_plus_b, option_asc), (&col_c, option_asc)],
+                ],
+                // projection exprs
+                vec![
+                    (col_b, "b_new".to_string()),
+                    (col_a, "a_new".to_string()),
+                    (col_c, "c_new".to_string()),
+                    (&a_plus_b, "a+b".to_string()),
+                ],
+                // expected
+                vec![
+                    // [a + b ASC, c_new ASC]
+                    vec![("a+b", option_asc), ("c_new", option_asc)],
+                ],
+            ),
+            // ------- TEST CASE 7 ----------
+            (
+                vec![
+                    // [a ASC, b ASC, c ASC]
+                    vec![(col_a, option_asc), (col_b, option_asc)],
+                    // [a ASC, d ASC]
+                    vec![(col_a, option_asc), (col_d, option_asc)],
+                ],
+                // b as b_new, a as a_new, d as d_new b+d
+                vec![
+                    (col_b, "b_new".to_string()),
+                    (col_a, "a_new".to_string()),
+                    (col_d, "d_new".to_string()),
+                    (&b_plus_d, "b+d".to_string()),
+                ],
+                // expected
+                vec![
+                    // [a_new ASC, b_new ASC]
+                    vec![("a_new", option_asc), ("b_new", option_asc)],
+                    // [a_new ASC, d_new ASC]
+                    vec![("a_new", option_asc), ("d_new", option_asc)],
+                    // [a_new ASC, b+d ASC]
+                    vec![("a_new", option_asc), ("b+d", option_asc)],
+                ],
+            ),
+            // ------- TEST CASE 8 ----------
+            (
+                // orderings
+                vec![
+                    // [b+d ASC]
+                    vec![(&b_plus_d, option_asc)],
+                ],
+                // proj exprs
+                vec![
+                    (col_b, "b_new".to_string()),
+                    (col_a, "a_new".to_string()),
+                    (col_d, "d_new".to_string()),
+                    (&b_plus_d, "b+d".to_string()),
+                ],
+                // expected
+                vec![
+                    // [b+d ASC]
+                    vec![("b+d", option_asc)],
+                ],
+            ),
+            // ------- TEST CASE 9 ----------
+            (
+                // orderings
+                vec![
+                    // [a ASC, d ASC, b ASC]
+                    vec![
+                        (col_a, option_asc),
+                        (col_d, option_asc),
+                        (col_b, option_asc),
+                    ],
+                    // [c ASC]
+                    vec![(col_c, option_asc)],
+                ],
+                // proj exprs
+                vec![
+                    (col_b, "b_new".to_string()),
+                    (col_a, "a_new".to_string()),
+                    (col_d, "d_new".to_string()),
+                    (col_c, "c_new".to_string()),
+                ],
+                // expected
+                vec![
+                    // [a_new ASC, d_new ASC, b_new ASC]
+                    vec![
+                        ("a_new", option_asc),
+                        ("d_new", option_asc),
+                        ("b_new", option_asc),
+                    ],
+                    // [c_new ASC],
+                    vec![("c_new", option_asc)],
+                ],
+            ),
+            // ------- TEST CASE 10 ----------
+            (
+                vec![
+                    // [a ASC, b ASC, c ASC]
+                    vec![
+                        (col_a, option_asc),
+                        (col_b, option_asc),
+                        (col_c, option_asc),
+                    ],
+                    // [a ASC, d ASC]
+                    vec![(col_a, option_asc), (col_d, option_asc)],
+                ],
+                // proj exprs
+                vec![
+                    (col_b, "b_new".to_string()),
+                    (col_a, "a_new".to_string()),
+                    (col_c, "c_new".to_string()),
+                    (&c_plus_d, "c+d".to_string()),
+                ],
+                // expected
+                vec![
+                    // [a_new ASC, b_new ASC, c_new ASC]
+                    vec![
+                        ("a_new", option_asc),
+                        ("b_new", option_asc),
+                        ("c_new", option_asc),
+                    ],
+                    // [a_new ASC, b_new ASC, c+d ASC]
+                    vec![
+                        ("a_new", option_asc),
+                        ("b_new", option_asc),
+                        ("c+d", option_asc),
+                    ],
+                ],
+            ),
+            // ------- TEST CASE 11 ----------
+            (
+                // orderings
+                vec![
+                    // [a ASC, b ASC]
+                    vec![(col_a, option_asc), (col_b, option_asc)],
+                    // [a ASC, d ASC]
+                    vec![(col_a, option_asc), (col_d, option_asc)],
+                ],
+                // proj exprs
+                vec![
+                    (col_b, "b_new".to_string()),
+                    (col_a, "a_new".to_string()),
+                    (&b_plus_d, "b+d".to_string()),
+                ],
+                // expected
+                vec![
+                    // [a_new ASC, b_new ASC]
+                    vec![("a_new", option_asc), ("b_new", option_asc)],
+                    // [a_new ASC, b + d ASC]
+                    vec![("a_new", option_asc), ("b+d", option_asc)],
+                ],
+            ),
+            // ------- TEST CASE 12 ----------
+            (
+                // orderings
+                vec![
+                    // [a ASC, b ASC, c ASC]
+                    vec![
+                        (col_a, option_asc),
+                        (col_b, option_asc),
+                        (col_c, option_asc),
+                    ],
+                ],
+                // proj exprs
+                vec![(col_c, "c_new".to_string()), (col_a, 
"a_new".to_string())],
+                // expected
+                vec![
+                    // [a_new ASC]
+                    vec![("a_new", option_asc)],
+                ],
+            ),
+            // ------- TEST CASE 13 ----------
+            (
+                // orderings
+                vec![
+                    // [a ASC, b ASC, c ASC]
+                    vec![
+                        (col_a, option_asc),
+                        (col_b, option_asc),
+                        (col_c, option_asc),
+                    ],
+                    // [a ASC, a + b ASC, c ASC]
+                    vec![
+                        (col_a, option_asc),
+                        (&a_plus_b, option_asc),
+                        (col_c, option_asc),
+                    ],
+                ],
+                // proj exprs
+                vec![
+                    (col_c, "c_new".to_string()),
+                    (col_b, "b_new".to_string()),
+                    (col_a, "a_new".to_string()),
+                    (&a_plus_b, "a+b".to_string()),
+                ],
+                // expected
+                vec![
+                    // [a_new ASC, b_new ASC, c_new ASC]
+                    vec![
+                        ("a_new", option_asc),
+                        ("b_new", option_asc),
+                        ("c_new", option_asc),
+                    ],
+                    // [a_new ASC, a+b ASC, c_new ASC]
+                    vec![
+                        ("a_new", option_asc),
+                        ("a+b", option_asc),
+                        ("c_new", option_asc),
+                    ],
+                ],
+            ),
+            // ------- TEST CASE 14 ----------
+            (
+                // orderings
+                vec![
+                    // [a ASC, b ASC]
+                    vec![(col_a, option_asc), (col_b, option_asc)],
+                    // [c ASC, b ASC]
+                    vec![(col_c, option_asc), (col_b, option_asc)],
+                    // [d ASC, e ASC]
+                    vec![(col_d, option_asc), (col_e, option_asc)],
+                ],
+                // proj exprs
+                vec![
+                    (col_c, "c_new".to_string()),
+                    (col_d, "d_new".to_string()),
+                    (col_a, "a_new".to_string()),
+                    (&b_plus_e, "b+e".to_string()),
+                ],
+                // expected
+                vec![
+                    // [a_new ASC, d_new ASC, b+e ASC]
+                    vec![
+                        ("a_new", option_asc),
+                        ("d_new", option_asc),
+                        ("b+e", option_asc),
+                    ],
+                    // [d_new ASC, a_new ASC, b+e ASC]
+                    vec![
+                        ("d_new", option_asc),
+                        ("a_new", option_asc),
+                        ("b+e", option_asc),
+                    ],
+                    // [c_new ASC, d_new ASC, b+e ASC]
+                    vec![
+                        ("c_new", option_asc),
+                        ("d_new", option_asc),
+                        ("b+e", option_asc),
+                    ],
+                    // [d_new ASC, c_new ASC, b+e ASC]
+                    vec![
+                        ("d_new", option_asc),
+                        ("c_new", option_asc),
+                        ("b+e", option_asc),
+                    ],
+                ],
+            ),
+            // ------- TEST CASE 15 ----------
+            (
+                // orderings
+                vec![
+                    // [a ASC, c ASC, b ASC]
+                    vec![
+                        (col_a, option_asc),
+                        (col_c, option_asc),
+                        (&col_b, option_asc),
+                    ],
+                ],
+                // proj exprs
+                vec![
+                    (col_c, "c_new".to_string()),
+                    (col_a, "a_new".to_string()),
+                    (&a_plus_b, "a+b".to_string()),
+                ],
+                // expected
+                vec![
+                    // [a_new ASC, d_new ASC, b+e ASC]
+                    vec![
+                        ("a_new", option_asc),
+                        ("c_new", option_asc),
+                        ("a+b", option_asc),
+                    ],
+                ],
+            ),
+            // ------- TEST CASE 16 ----------
+            (
+                // orderings
+                vec![
+                    // [a ASC, b ASC]
+                    vec![(col_a, option_asc), (col_b, option_asc)],
+                    // [c ASC, b DESC]
+                    vec![(col_c, option_asc), (col_b, option_desc)],
+                    // [e ASC]
+                    vec![(col_e, option_asc)],
+                ],
+                // proj exprs
+                vec![
+                    (col_c, "c_new".to_string()),
+                    (col_a, "a_new".to_string()),
+                    (col_b, "b_new".to_string()),
+                    (&b_plus_e, "b+e".to_string()),
+                ],
+                // expected
+                vec![
+                    // [a_new ASC, b_new ASC]
+                    vec![("a_new", option_asc), ("b_new", option_asc)],
+                    // [a_new ASC, b_new ASC]
+                    vec![("a_new", option_asc), ("b+e", option_asc)],
+                    // [c_new ASC, b_new DESC]
+                    vec![("c_new", option_asc), ("b_new", option_desc)],
+                ],
+            ),
+        ];
+
+        for (idx, (orderings, proj_exprs, expected)) in 
test_cases.into_iter().enumerate()
+        {
+            let mut eq_properties = EquivalenceProperties::new(schema.clone());
+
+            let orderings = convert_to_orderings(&orderings);
+            eq_properties.add_new_orderings(orderings);
+
+            let proj_exprs = proj_exprs
+                .into_iter()
+                .map(|(expr, name)| (expr.clone(), name))
+                .collect::<Vec<_>>();
+            let projection_mapping = ProjectionMapping::try_new(&proj_exprs, 
&schema)?;
+            let output_schema = output_schema(&projection_mapping, &schema)?;
+
+            let expected = expected
+                .into_iter()
+                .map(|ordering| {
+                    ordering
+                        .into_iter()
+                        .map(|(name, options)| {
+                            (col(name, &output_schema).unwrap(), options)
+                        })
+                        .collect::<Vec<_>>()
+                })
+                .collect::<Vec<_>>();
+            let expected = convert_to_orderings_owned(&expected);
+
+            let projected_eq = eq_properties.project(&projection_mapping, 
output_schema);
+            let orderings = projected_eq.oeq_class();
+
+            let err_msg = format!(
+                "test_idx: {:?}, actual: {:?}, expected: {:?}, 
projection_mapping: {:?}",
+                idx, orderings.orderings, expected, projection_mapping
+            );
+
+            assert_eq!(orderings.len(), expected.len(), "{}", err_msg);
+            for expected_ordering in &expected {
+                assert!(orderings.contains(expected_ordering), "{}", err_msg)
+            }
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn project_orderings2() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int32, true),
+            Field::new("c", DataType::Int32, true),
+            Field::new("d", DataType::Int32, true),
+            Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), 
true),
+        ]));
+        let col_a = &col("a", &schema)?;
+        let col_b = &col("b", &schema)?;
+        let col_c = &col("c", &schema)?;
+        let col_ts = &col("ts", &schema)?;
+        let a_plus_b = Arc::new(BinaryExpr::new(
+            col_a.clone(),
+            Operator::Plus,
+            col_b.clone(),
+        )) as Arc<dyn PhysicalExpr>;
+        let interval = 
Arc::new(Literal::new(ScalarValue::IntervalDayTime(Some(2))))
+            as Arc<dyn PhysicalExpr>;
+        let date_bin_ts = &create_physical_expr(
+            &BuiltinScalarFunction::DateBin,
+            &[interval, col_ts.clone()],
+            &schema,
+            &ExecutionProps::default(),
+        )?;
+
+        let round_c = &create_physical_expr(
+            &BuiltinScalarFunction::Round,
+            &[col_c.clone()],
+            &schema,
+            &ExecutionProps::default(),
+        )?;
+
+        let option_asc = SortOptions {
+            descending: false,
+            nulls_first: false,
+        };
+
+        let proj_exprs = vec![
+            (col_b, "b_new".to_string()),
+            (col_a, "a_new".to_string()),
+            (col_c, "c_new".to_string()),
+            (date_bin_ts, "date_bin_res".to_string()),
+            (round_c, "round_c_res".to_string()),
+        ];
+        let proj_exprs = proj_exprs
+            .into_iter()
+            .map(|(expr, name)| (expr.clone(), name))
+            .collect::<Vec<_>>();
+        let projection_mapping = ProjectionMapping::try_new(&proj_exprs, 
&schema)?;
+        let output_schema = output_schema(&projection_mapping, &schema)?;
+
+        let col_a_new = &col("a_new", &output_schema)?;
+        let col_b_new = &col("b_new", &output_schema)?;
+        let col_c_new = &col("c_new", &output_schema)?;
+        let col_date_bin_res = &col("date_bin_res", &output_schema)?;
+        let col_round_c_res = &col("round_c_res", &output_schema)?;
+        let a_new_plus_b_new = Arc::new(BinaryExpr::new(
+            col_a_new.clone(),
+            Operator::Plus,
+            col_b_new.clone(),
+        )) as Arc<dyn PhysicalExpr>;
+
+        let test_cases = vec![
+            // ---------- TEST CASE 1 ------------
+            (
+                // orderings
+                vec![
+                    // [a ASC]
+                    vec![(col_a, option_asc)],
+                ],
+                // expected
+                vec![
+                    // [b_new ASC]
+                    vec![(col_a_new, option_asc)],
+                ],
+            ),
+            // ---------- TEST CASE 2 ------------
+            (
+                // orderings
+                vec![
+                    // [a+b ASC]
+                    vec![(&a_plus_b, option_asc)],
+                ],
+                // expected
+                vec![
+                    // [b_new ASC]
+                    vec![(&a_new_plus_b_new, option_asc)],
+                ],
+            ),
+            // ---------- TEST CASE 3 ------------
+            (
+                // orderings
+                vec![
+                    // [a ASC, ts ASC]
+                    vec![(col_a, option_asc), (col_ts, option_asc)],
+                ],
+                // expected
+                vec![
+                    // [a_new ASC, date_bin_res ASC]
+                    vec![(col_a_new, option_asc), (col_date_bin_res, 
option_asc)],
+                ],
+            ),
+            // ---------- TEST CASE 4 ------------
+            (
+                // orderings
+                vec![
+                    // [a ASC, ts ASC, b ASC]
+                    vec![
+                        (col_a, option_asc),
+                        (col_ts, option_asc),
+                        (col_b, option_asc),
+                    ],
+                ],
+                // expected
+                vec![
+                    // [a_new ASC, date_bin_res ASC]
+                    // Please note that result is not [a_new ASC, date_bin_res 
ASC, b_new ASC]
+                    // because, datebin_res may not be 1-1 function. Hence 
without introducing ts
+                    // dependency we cannot guarantee any ordering after 
date_bin_res column.
+                    vec![(col_a_new, option_asc), (col_date_bin_res, 
option_asc)],
+                ],
+            ),
+            // ---------- TEST CASE 5 ------------
+            (
+                // orderings
+                vec![
+                    // [a ASC, c ASC]
+                    vec![(col_a, option_asc), (col_c, option_asc)],
+                ],
+                // expected
+                vec![
+                    // [a_new ASC, round_c_res ASC, c_new ASC]
+                    vec![(col_a_new, option_asc), (col_round_c_res, 
option_asc)],
+                    // [a_new ASC, c_new ASC]
+                    vec![(col_a_new, option_asc), (col_c_new, option_asc)],
+                ],
+            ),
+            // ---------- TEST CASE 6 ------------
+            (
+                // orderings
+                vec![
+                    // [c ASC, b ASC]
+                    vec![(col_c, option_asc), (col_b, option_asc)],
+                ],
+                // expected
+                vec![
+                    // [round_c_res ASC]
+                    vec![(col_round_c_res, option_asc)],
+                    // [c_new ASC, b_new ASC]
+                    vec![(col_c_new, option_asc), (col_b_new, option_asc)],
+                ],
+            ),
+            // ---------- TEST CASE 7 ------------
+            (
+                // orderings
+                vec![
+                    // [a+b ASC, c ASC]
+                    vec![(&a_plus_b, option_asc), (col_c, option_asc)],
+                ],
+                // expected
+                vec![
+                    // [a+b ASC, round(c) ASC, c_new ASC]
+                    vec![
+                        (&a_new_plus_b_new, option_asc),
+                        (&col_round_c_res, option_asc),
+                    ],
+                    // [a+b ASC, c_new ASC]
+                    vec![(&a_new_plus_b_new, option_asc), (col_c_new, 
option_asc)],
+                ],
+            ),
+        ];
+
+        for (idx, (orderings, expected)) in test_cases.iter().enumerate() {
+            let mut eq_properties = EquivalenceProperties::new(schema.clone());
+
+            let orderings = convert_to_orderings(orderings);
+            eq_properties.add_new_orderings(orderings);
+
+            let expected = convert_to_orderings(expected);
+
+            let projected_eq =
+                eq_properties.project(&projection_mapping, 
output_schema.clone());
+            let orderings = projected_eq.oeq_class();
+
+            let err_msg = format!(
+                "test idx: {:?}, actual: {:?}, expected: {:?}, 
projection_mapping: {:?}",
+                idx, orderings.orderings, expected, projection_mapping
+            );
+
+            assert_eq!(orderings.len(), expected.len(), "{}", err_msg);
+            for expected_ordering in &expected {
+                assert!(orderings.contains(expected_ordering), "{}", err_msg)
+            }
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn project_orderings3() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int32, true),
+            Field::new("c", DataType::Int32, true),
+            Field::new("d", DataType::Int32, true),
+            Field::new("e", DataType::Int32, true),
+            Field::new("f", DataType::Int32, true),
+        ]));
+        let col_a = &col("a", &schema)?;
+        let col_b = &col("b", &schema)?;
+        let col_c = &col("c", &schema)?;
+        let col_d = &col("d", &schema)?;
+        let col_e = &col("e", &schema)?;
+        let col_f = &col("f", &schema)?;
+        let a_plus_b = Arc::new(BinaryExpr::new(
+            col_a.clone(),
+            Operator::Plus,
+            col_b.clone(),
+        )) as Arc<dyn PhysicalExpr>;
+
+        let option_asc = SortOptions {
+            descending: false,
+            nulls_first: false,
+        };
+
+        let proj_exprs = vec![
+            (col_c, "c_new".to_string()),
+            (col_d, "d_new".to_string()),
+            (&a_plus_b, "a+b".to_string()),
+        ];
+        let proj_exprs = proj_exprs
+            .into_iter()
+            .map(|(expr, name)| (expr.clone(), name))
+            .collect::<Vec<_>>();
+        let projection_mapping = ProjectionMapping::try_new(&proj_exprs, 
&schema)?;
+        let output_schema = output_schema(&projection_mapping, &schema)?;
+
+        let col_a_plus_b_new = &col("a+b", &output_schema)?;
+        let col_c_new = &col("c_new", &output_schema)?;
+        let col_d_new = &col("d_new", &output_schema)?;
+
+        let test_cases = vec![
+            // ---------- TEST CASE 1 ------------
+            (
+                // orderings
+                vec![
+                    // [d ASC, b ASC]
+                    vec![(col_d, option_asc), (col_b, option_asc)],
+                    // [c ASC, a ASC]
+                    vec![(col_c, option_asc), (col_a, option_asc)],
+                ],
+                // equal conditions
+                vec![],
+                // expected
+                vec![
+                    // [d_new ASC, c_new ASC, a+b ASC]
+                    vec![
+                        (col_d_new, option_asc),
+                        (col_c_new, option_asc),
+                        (col_a_plus_b_new, option_asc),
+                    ],
+                    // [c_new ASC, d_new ASC, a+b ASC]
+                    vec![
+                        (col_c_new, option_asc),
+                        (col_d_new, option_asc),
+                        (col_a_plus_b_new, option_asc),
+                    ],
+                ],
+            ),
+            // ---------- TEST CASE 2 ------------
+            (
+                // orderings
+                vec![
+                    // [d ASC, b ASC]
+                    vec![(col_d, option_asc), (col_b, option_asc)],
+                    // [c ASC, e ASC], Please note that a=e
+                    vec![(col_c, option_asc), (col_e, option_asc)],
+                ],
+                // equal conditions
+                vec![(col_e, col_a)],
+                // expected
+                vec![
+                    // [d_new ASC, c_new ASC, a+b ASC]
+                    vec![
+                        (col_d_new, option_asc),
+                        (col_c_new, option_asc),
+                        (col_a_plus_b_new, option_asc),
+                    ],
+                    // [c_new ASC, d_new ASC, a+b ASC]
+                    vec![
+                        (col_c_new, option_asc),
+                        (col_d_new, option_asc),
+                        (col_a_plus_b_new, option_asc),
+                    ],
+                ],
+            ),
+            // ---------- TEST CASE 3 ------------
+            (
+                // orderings
+                vec![
+                    // [d ASC, b ASC]
+                    vec![(col_d, option_asc), (col_b, option_asc)],
+                    // [c ASC, e ASC], Please note that a=f
+                    vec![(col_c, option_asc), (col_e, option_asc)],
+                ],
+                // equal conditions
+                vec![(col_a, col_f)],
+                // expected
+                vec![
+                    // [d_new ASC]
+                    vec![(col_d_new, option_asc)],
+                    // [c_new ASC]
+                    vec![(col_c_new, option_asc)],
+                ],
+            ),
+        ];
+        for (orderings, equal_columns, expected) in test_cases {
+            let mut eq_properties = EquivalenceProperties::new(schema.clone());
+            for (lhs, rhs) in equal_columns {
+                eq_properties.add_equal_conditions(lhs, rhs);
+            }
+
+            let orderings = convert_to_orderings(&orderings);
+            eq_properties.add_new_orderings(orderings);
+
+            let expected = convert_to_orderings(&expected);
+
+            let projected_eq =
+                eq_properties.project(&projection_mapping, 
output_schema.clone());
+            let orderings = projected_eq.oeq_class();
+
+            let err_msg = format!(
+                "actual: {:?}, expected: {:?}, projection_mapping: {:?}",
+                orderings.orderings, expected, projection_mapping
+            );
+
+            assert_eq!(orderings.len(), expected.len(), "{}", err_msg);
+            for expected_ordering in &expected {
+                assert!(orderings.contains(expected_ordering), "{}", err_msg)
+            }
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn project_orderings_random() -> Result<()> {
+        const N_RANDOM_SCHEMA: usize = 20;
+        const N_ELEMENTS: usize = 125;
+        const N_DISTINCT: usize = 5;
+
+        for seed in 0..N_RANDOM_SCHEMA {
+            // Create a random schema with random properties
+            let (test_schema, eq_properties) = create_random_schema(seed as 
u64)?;
+            // Generate a data that satisfies properties given
+            let table_data_with_properties =
+                generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, 
N_DISTINCT)?;
+            // Floor(a)
+            let floor_a = create_physical_expr(
+                &BuiltinScalarFunction::Floor,
+                &[col("a", &test_schema)?],
+                &test_schema,
+                &ExecutionProps::default(),
+            )?;
+            // a + b
+            let a_plus_b = Arc::new(BinaryExpr::new(
+                col("a", &test_schema)?,
+                Operator::Plus,
+                col("b", &test_schema)?,
+            )) as Arc<dyn PhysicalExpr>;
+            let proj_exprs = vec![
+                (col("a", &test_schema)?, "a_new"),
+                (col("b", &test_schema)?, "b_new"),
+                (col("c", &test_schema)?, "c_new"),
+                (col("d", &test_schema)?, "d_new"),
+                (col("e", &test_schema)?, "e_new"),
+                (col("f", &test_schema)?, "f_new"),
+                (floor_a, "floor(a)"),
+                (a_plus_b, "a+b"),
+            ];
+
+            for n_req in 0..=proj_exprs.len() {
+                for proj_exprs in proj_exprs.iter().combinations(n_req) {
+                    let proj_exprs = proj_exprs
+                        .into_iter()
+                        .map(|(expr, name)| (expr.clone(), name.to_string()))
+                        .collect::<Vec<_>>();
+                    let (projected_batch, projected_eq) = apply_projection(
+                        proj_exprs.clone(),
+                        &table_data_with_properties,
+                        &eq_properties,
+                    )?;
+
+                    // Make sure each ordering after projection is valid.
+                    for ordering in projected_eq.oeq_class().iter() {
+                        let err_msg = format!(
+                            "Error in test case ordering:{:?}, 
eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, 
eq_properties.constants: {:?}, proj_exprs: {:?}",
+                            ordering, eq_properties.oeq_class, 
eq_properties.eq_group, eq_properties.constants, proj_exprs
+                        );
+                        // Since ordered section satisfies schema, we expect
+                        // that result will be same after sort (e.g sort was 
unnecessary).
+                        assert!(
+                            is_table_same_after_sort(
+                                ordering.clone(),
+                                projected_batch.clone(),
+                            )?,
+                            "{}",
+                            err_msg
+                        );
+                    }
+                }
+            }
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn ordering_satisfy_after_projection_random() -> Result<()> {
+        const N_RANDOM_SCHEMA: usize = 20;
+        const N_ELEMENTS: usize = 125;
+        const N_DISTINCT: usize = 5;
+        const SORT_OPTIONS: SortOptions = SortOptions {
+            descending: false,
+            nulls_first: false,
+        };
+
+        for seed in 0..N_RANDOM_SCHEMA {
+            // Create a random schema with random properties
+            let (test_schema, eq_properties) = create_random_schema(seed as 
u64)?;
+            // Generate a data that satisfies properties given
+            let table_data_with_properties =
+                generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, 
N_DISTINCT)?;
+            // Floor(a)
+            let floor_a = create_physical_expr(
+                &BuiltinScalarFunction::Floor,
+                &[col("a", &test_schema)?],
+                &test_schema,
+                &ExecutionProps::default(),
+            )?;
+            // a + b
+            let a_plus_b = Arc::new(BinaryExpr::new(
+                col("a", &test_schema)?,
+                Operator::Plus,
+                col("b", &test_schema)?,
+            )) as Arc<dyn PhysicalExpr>;
+            let proj_exprs = vec![
+                (col("a", &test_schema)?, "a_new"),
+                (col("b", &test_schema)?, "b_new"),
+                (col("c", &test_schema)?, "c_new"),
+                (col("d", &test_schema)?, "d_new"),
+                (col("e", &test_schema)?, "e_new"),
+                (col("f", &test_schema)?, "f_new"),
+                (floor_a, "floor(a)"),
+                (a_plus_b, "a+b"),
+            ];
+
+            for n_req in 0..=proj_exprs.len() {
+                for proj_exprs in proj_exprs.iter().combinations(n_req) {
+                    let proj_exprs = proj_exprs
+                        .into_iter()
+                        .map(|(expr, name)| (expr.clone(), name.to_string()))
+                        .collect::<Vec<_>>();
+                    let (projected_batch, projected_eq) = apply_projection(
+                        proj_exprs.clone(),
+                        &table_data_with_properties,
+                        &eq_properties,
+                    )?;
+
+                    let projection_mapping =
+                        ProjectionMapping::try_new(&proj_exprs, &test_schema)?;
+
+                    let projected_exprs = projection_mapping
+                        .iter()
+                        .map(|(_source, target)| target.clone())
+                        .collect::<Vec<_>>();
+
+                    for n_req in 0..=projected_exprs.len() {
+                        for exprs in 
projected_exprs.iter().combinations(n_req) {
+                            let requirement = exprs
+                                .into_iter()
+                                .map(|expr| PhysicalSortExpr {
+                                    expr: expr.clone(),
+                                    options: SORT_OPTIONS,
+                                })
+                                .collect::<Vec<_>>();
+                            let expected = is_table_same_after_sort(
+                                requirement.clone(),
+                                projected_batch.clone(),
+                            )?;
+                            let err_msg = format!(
+                                "Error in test case requirement:{:?}, 
expected: {:?}, eq_properties.oeq_class: {:?}, eq_properties.eq_group: {:?}, 
eq_properties.constants: {:?}, projected_eq.oeq_class: {:?}, 
projected_eq.eq_group: {:?}, projected_eq.constants: {:?}, projection_mapping: 
{:?}",
+                                requirement, expected, 
eq_properties.oeq_class, eq_properties.eq_group, eq_properties.constants, 
projected_eq.oeq_class, projected_eq.eq_group, projected_eq.constants, 
projection_mapping
+                            );
+                            // Check whether ordering_satisfy API result and
+                            // experimental result matches.
+                            assert_eq!(
+                                projected_eq.ordering_satisfy(&requirement),
+                                expected,
+                                "{}",
+                                err_msg
+                            );
+                        }
+                    }
+                }
+            }
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_expr_consists_of_constants() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int32, true),
+            Field::new("c", DataType::Int32, true),
+            Field::new("d", DataType::Int32, true),
+            Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), 
true),
+        ]));
+        let col_a = col("a", &schema)?;
+        let col_b = col("b", &schema)?;
+        let col_d = col("d", &schema)?;
+        let b_plus_d = Arc::new(BinaryExpr::new(
+            col_b.clone(),
+            Operator::Plus,
+            col_d.clone(),
+        )) as Arc<dyn PhysicalExpr>;
+
+        let constants = vec![col_a.clone(), col_b.clone()];
+        let expr = b_plus_d.clone();
+        assert!(!is_constant_recurse(&constants, &expr));
+
+        let constants = vec![col_a.clone(), col_b.clone(), col_d.clone()];
+        let expr = b_plus_d.clone();
+        assert!(is_constant_recurse(&constants, &expr));
+        Ok(())
+    }
+
+    #[test]
+    fn test_join_equivalence_properties() -> Result<()> {
+        let schema = create_test_schema()?;
+        let col_a = &col("a", &schema)?;
+        let col_b = &col("b", &schema)?;
+        let col_c = &col("c", &schema)?;
+        let offset = schema.fields.len();
+        let col_a2 = &add_offset_to_expr(col_a.clone(), offset);
+        let col_b2 = &add_offset_to_expr(col_b.clone(), offset);
+        let option_asc = SortOptions {
+            descending: false,
+            nulls_first: false,
+        };
+        let test_cases = vec![
+            // ------- TEST CASE 1 --------
+            // [a ASC], [b ASC]
+            (
+                // [a ASC], [b ASC]
+                vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]],
+                // [a ASC], [b ASC]
+                vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]],
+                // expected [a ASC, a2 ASC], [a ASC, b2 ASC], [b ASC, a2 ASC], 
[b ASC, b2 ASC]
+                vec![
+                    vec![(col_a, option_asc), (col_a2, option_asc)],
+                    vec![(col_a, option_asc), (col_b2, option_asc)],
+                    vec![(col_b, option_asc), (col_a2, option_asc)],
+                    vec![(col_b, option_asc), (col_b2, option_asc)],
+                ],
+            ),
+            // ------- TEST CASE 2 --------
+            // [a ASC], [b ASC]
+            (
+                // [a ASC], [b ASC], [c ASC]
+                vec![
+                    vec![(col_a, option_asc)],
+                    vec![(col_b, option_asc)],
+                    vec![(col_c, option_asc)],
+                ],
+                // [a ASC], [b ASC]
+                vec![vec![(col_a, option_asc)], vec![(col_b, option_asc)]],
+                // expected [a ASC, a2 ASC], [a ASC, b2 ASC], [b ASC, a2 ASC], 
[b ASC, b2 ASC], [c ASC, a2 ASC], [c ASC, b2 ASC]
+                vec![
+                    vec![(col_a, option_asc), (col_a2, option_asc)],
+                    vec![(col_a, option_asc), (col_b2, option_asc)],
+                    vec![(col_b, option_asc), (col_a2, option_asc)],
+                    vec![(col_b, option_asc), (col_b2, option_asc)],
+                    vec![(col_c, option_asc), (col_a2, option_asc)],
+                    vec![(col_c, option_asc), (col_b2, option_asc)],
+                ],
+            ),
+        ];
+        for (left_orderings, right_orderings, expected) in test_cases {
+            let mut left_eq_properties = 
EquivalenceProperties::new(schema.clone());
+            let mut right_eq_properties = 
EquivalenceProperties::new(schema.clone());
+            let left_orderings = convert_to_orderings(&left_orderings);
+            let right_orderings = convert_to_orderings(&right_orderings);
+            let expected = convert_to_orderings(&expected);
+            left_eq_properties.add_new_orderings(left_orderings);
+            right_eq_properties.add_new_orderings(right_orderings);
+            let join_eq = join_equivalence_properties(
+                left_eq_properties,
+                right_eq_properties,
+                &JoinType::Inner,
+                Arc::new(Schema::empty()),
+                &[true, false],
+                Some(JoinSide::Left),
+                &[],
+            );
+            let orderings = &join_eq.oeq_class.orderings;
+            let err_msg = format!("expected: {:?}, actual:{:?}", expected, 
orderings);
+            assert_eq!(
+                join_eq.oeq_class.orderings.len(),
+                expected.len(),
+                "{}",
+                err_msg
+            );
+            for ordering in orderings {
+                assert!(
+                    expected.contains(ordering),
+                    "{}, ordering: {:?}",
+                    err_msg,
+                    ordering
+                );
+            }
+        }
         Ok(())
     }
 }

Reply via email to