This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a2ac00da1b Improve Union Equivalence Propagation (#11506)
a2ac00da1b is described below

commit a2ac00da1b3aa7879317ae88d1b356b27f49f887
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Jul 22 23:51:43 2024 +0300

    Improve Union Equivalence Propagation (#11506)
    
    * Initial commit
    
    * Fix formatting
    
    * Minor changes
    
    * Fix failing test
    
    * Change union calculation algorithm to make it symmetric
    
    * Minor changes
    
    * Add unit tests
    
    * Simplifications
    
    * Review Part 1
    
    * Move test and union equivalence
    
    * Add new tests
    
    * Support for union with different schema
    
    * Address reviews
    
    * Review Part 2
    
    * Add new tests
    
    * Final Review
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 .../physical-expr-common/src/physical_expr.rs      |  33 +-
 datafusion/physical-expr/src/equivalence/mod.rs    |   4 +-
 .../physical-expr/src/equivalence/properties.rs    | 641 ++++++++++++++++++---
 datafusion/physical-expr/src/lib.rs                |   2 +-
 datafusion/physical-plan/src/common.rs             | 356 +-----------
 datafusion/physical-plan/src/union.rs              | 115 ++--
 datafusion/sqllogictest/test_files/order.slt       |   7 +
 7 files changed, 647 insertions(+), 511 deletions(-)

diff --git a/datafusion/physical-expr-common/src/physical_expr.rs 
b/datafusion/physical-expr-common/src/physical_expr.rs
index 1998f14396..c74fb9c2d1 100644
--- a/datafusion/physical-expr-common/src/physical_expr.rs
+++ b/datafusion/physical-expr-common/src/physical_expr.rs
@@ -20,13 +20,15 @@ use std::fmt::{Debug, Display};
 use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
+use crate::expressions::column::Column;
 use crate::utils::scatter;
 
 use arrow::array::BooleanArray;
 use arrow::compute::filter_record_batch;
-use arrow::datatypes::{DataType, Schema};
+use arrow::datatypes::{DataType, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
-use datafusion_common::{internal_err, not_impl_err, Result};
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::{internal_err, not_impl_err, plan_err, Result};
 use datafusion_expr::interval_arithmetic::Interval;
 use datafusion_expr::sort_properties::ExprProperties;
 use datafusion_expr::ColumnarValue;
@@ -191,6 +193,33 @@ pub fn with_new_children_if_necessary(
     }
 }
 
+/// Rewrites an expression according to new schema; i.e. changes the columns it
+/// refers to with the column at corresponding index in the new schema. Returns
+/// an error if the given schema has fewer columns than the original schema.
+/// Note that the resulting expression may not be valid if data types in the
+/// new schema is incompatible with expression nodes.
+pub fn with_new_schema(
+    expr: Arc<dyn PhysicalExpr>,
+    schema: &SchemaRef,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    Ok(expr
+        .transform_up(|expr| {
+            if let Some(col) = expr.as_any().downcast_ref::<Column>() {
+                let idx = col.index();
+                let Some(field) = schema.fields().get(idx) else {
+                    return plan_err!(
+                        "New schema has fewer columns than original schema"
+                    );
+                };
+                let new_col = Column::new(field.name(), idx);
+                Ok(Transformed::yes(Arc::new(new_col) as _))
+            } else {
+                Ok(Transformed::no(expr))
+            }
+        })?
+        .data)
+}
+
 pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
     if any.is::<Arc<dyn PhysicalExpr>>() {
         any.downcast_ref::<Arc<dyn PhysicalExpr>>()
diff --git a/datafusion/physical-expr/src/equivalence/mod.rs 
b/datafusion/physical-expr/src/equivalence/mod.rs
index 83f94057f7..b9228282b0 100644
--- a/datafusion/physical-expr/src/equivalence/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/mod.rs
@@ -30,7 +30,9 @@ mod properties;
 pub use class::{ConstExpr, EquivalenceClass, EquivalenceGroup};
 pub use ordering::OrderingEquivalenceClass;
 pub use projection::ProjectionMapping;
-pub use properties::{join_equivalence_properties, EquivalenceProperties};
+pub use properties::{
+    calculate_union, join_equivalence_properties, EquivalenceProperties,
+};
 
 /// This function constructs a duplicate-free `LexOrderingReq` by filtering out
 /// duplicate entries that have same physical expression inside. For example,
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs 
b/datafusion/physical-expr/src/equivalence/properties.rs
index 8c327fbaf4..64c22064d4 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -21,7 +21,8 @@ use std::sync::Arc;
 use super::ordering::collapse_lex_ordering;
 use crate::equivalence::class::const_exprs_contains;
 use crate::equivalence::{
-    collapse_lex_req, EquivalenceGroup, OrderingEquivalenceClass, 
ProjectionMapping,
+    collapse_lex_req, EquivalenceClass, EquivalenceGroup, 
OrderingEquivalenceClass,
+    ProjectionMapping,
 };
 use crate::expressions::Literal;
 use crate::{
@@ -32,11 +33,12 @@ use crate::{
 
 use arrow_schema::{SchemaRef, SortOptions};
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_common::{JoinSide, JoinType, Result};
+use datafusion_common::{plan_err, JoinSide, JoinType, Result};
 use datafusion_expr::interval_arithmetic::Interval;
 use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
 use datafusion_physical_expr_common::expressions::column::Column;
 use datafusion_physical_expr_common::expressions::CastExpr;
+use datafusion_physical_expr_common::physical_expr::with_new_schema;
 use datafusion_physical_expr_common::utils::ExprPropertiesNode;
 
 use indexmap::{IndexMap, IndexSet};
@@ -536,33 +538,6 @@ impl EquivalenceProperties {
             .then_some(if lhs.len() >= rhs.len() { lhs } else { rhs })
     }
 
-    /// Calculates the "meet" of the given orderings (`lhs` and `rhs`).
-    /// The meet of a set of orderings is the finest ordering that is satisfied
-    /// by all the orderings in that set. For details, see:
-    ///
-    /// <https://en.wikipedia.org/wiki/Join_and_meet>
-    ///
-    /// If there is no ordering that satisfies both `lhs` and `rhs`, returns
-    /// `None`. As an example, the meet of orderings `[a ASC]` and `[a ASC, b 
ASC]`
-    /// is `[a ASC]`.
-    pub fn get_meet_ordering(
-        &self,
-        lhs: LexOrderingRef,
-        rhs: LexOrderingRef,
-    ) -> Option<LexOrdering> {
-        let lhs = self.normalize_sort_exprs(lhs);
-        let rhs = self.normalize_sort_exprs(rhs);
-        let mut meet = vec![];
-        for (lhs, rhs) in lhs.into_iter().zip(rhs.into_iter()) {
-            if lhs.eq(&rhs) {
-                meet.push(lhs);
-            } else {
-                break;
-            }
-        }
-        (!meet.is_empty()).then_some(meet)
-    }
-
     /// we substitute the ordering according to input expression type, this is 
a simplified version
     /// In this case, we just substitute when the expression satisfy the 
following condition:
     /// I. just have one column and is a CAST expression
@@ -1007,6 +982,74 @@ impl EquivalenceProperties {
             .map(|node| node.data)
             .unwrap_or(ExprProperties::new_unknown())
     }
+
+    /// Transforms this `EquivalenceProperties` into a new 
`EquivalenceProperties`
+    /// by mapping columns in the original schema to columns in the new schema
+    /// by index.
+    pub fn with_new_schema(self, schema: SchemaRef) -> Result<Self> {
+        // The new schema and the original schema is aligned when they have the
+        // same number of columns, and fields at the same index have the same
+        // type in both schemas.
+        let schemas_aligned = (self.schema.fields.len() == schema.fields.len())
+            && self
+                .schema
+                .fields
+                .iter()
+                .zip(schema.fields.iter())
+                .all(|(lhs, rhs)| lhs.data_type().eq(rhs.data_type()));
+        if !schemas_aligned {
+            // Rewriting equivalence properties in terms of new schema is not
+            // safe when schemas are not aligned:
+            return plan_err!(
+                "Cannot rewrite old_schema:{:?} with new schema: {:?}",
+                self.schema,
+                schema
+            );
+        }
+        // Rewrite constants according to new schema:
+        let new_constants = self
+            .constants
+            .into_iter()
+            .map(|const_expr| {
+                let across_partitions = const_expr.across_partitions();
+                let new_const_expr = with_new_schema(const_expr.owned_expr(), 
&schema)?;
+                Ok(ConstExpr::new(new_const_expr)
+                    .with_across_partitions(across_partitions))
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // Rewrite orderings according to new schema:
+        let mut new_orderings = vec![];
+        for ordering in self.oeq_class.orderings {
+            let new_ordering = ordering
+                .into_iter()
+                .map(|mut sort_expr| {
+                    sort_expr.expr = with_new_schema(sort_expr.expr, &schema)?;
+                    Ok(sort_expr)
+                })
+                .collect::<Result<_>>()?;
+            new_orderings.push(new_ordering);
+        }
+
+        // Rewrite equivalence classes according to the new schema:
+        let mut eq_classes = vec![];
+        for eq_class in self.eq_group.classes {
+            let new_eq_exprs = eq_class
+                .into_vec()
+                .into_iter()
+                .map(|expr| with_new_schema(expr, &schema))
+                .collect::<Result<_>>()?;
+            eq_classes.push(EquivalenceClass::new(new_eq_exprs));
+        }
+
+        // Construct the resulting equivalence properties:
+        let mut result = EquivalenceProperties::new(schema);
+        result.constants = new_constants;
+        result.add_new_orderings(new_orderings);
+        result.add_equivalence_group(EquivalenceGroup::new(eq_classes));
+
+        Ok(result)
+    }
 }
 
 /// Calculates the properties of a given [`ExprPropertiesNode`].
@@ -1484,6 +1527,84 @@ impl Hash for ExprWrapper {
     }
 }
 
+/// Calculates the union (in the sense of `UnionExec`) `EquivalenceProperties`
+/// of  `lhs` and `rhs` according to the schema of `lhs`.
+fn calculate_union_binary(
+    lhs: EquivalenceProperties,
+    mut rhs: EquivalenceProperties,
+) -> Result<EquivalenceProperties> {
+    // TODO: In some cases, we should be able to preserve some equivalence
+    //       classes. Add support for such cases.
+
+    // Harmonize the schema of the rhs with the schema of the lhs (which is 
the accumulator schema):
+    if !rhs.schema.eq(&lhs.schema) {
+        rhs = rhs.with_new_schema(Arc::clone(&lhs.schema))?;
+    }
+
+    // First, calculate valid constants for the union. A quantity is constant
+    // after the union if it is constant in both sides.
+    let constants = lhs
+        .constants()
+        .iter()
+        .filter(|const_expr| const_exprs_contains(rhs.constants(), 
const_expr.expr()))
+        .map(|const_expr| {
+            // TODO: When both sides' constants are valid across partitions,
+            //       the union's constant should also be valid if values are
+            //       the same. However, we do not have the capability to
+            //       check this yet.
+            
ConstExpr::new(Arc::clone(const_expr.expr())).with_across_partitions(false)
+        })
+        .collect();
+
+    // Next, calculate valid orderings for the union by searching for prefixes
+    // in both sides.
+    let mut orderings = vec![];
+    for mut ordering in lhs.normalized_oeq_class().orderings {
+        // Progressively shorten the ordering to search for a satisfied prefix:
+        while !rhs.ordering_satisfy(&ordering) {
+            ordering.pop();
+        }
+        // There is a non-trivial satisfied prefix, add it as a valid ordering:
+        if !ordering.is_empty() {
+            orderings.push(ordering);
+        }
+    }
+    for mut ordering in rhs.normalized_oeq_class().orderings {
+        // Progressively shorten the ordering to search for a satisfied prefix:
+        while !lhs.ordering_satisfy(&ordering) {
+            ordering.pop();
+        }
+        // There is a non-trivial satisfied prefix, add it as a valid ordering:
+        if !ordering.is_empty() {
+            orderings.push(ordering);
+        }
+    }
+    let mut eq_properties = EquivalenceProperties::new(lhs.schema);
+    eq_properties.constants = constants;
+    eq_properties.add_new_orderings(orderings);
+    Ok(eq_properties)
+}
+
+/// Calculates the union (in the sense of `UnionExec`) `EquivalenceProperties`
+/// of the given `EquivalenceProperties` in `eqps` according to the given
+/// output `schema` (which need not be the same with those of `lhs` and `rhs`
+/// as details such as nullability may be different).
+pub fn calculate_union(
+    eqps: Vec<EquivalenceProperties>,
+    schema: SchemaRef,
+) -> Result<EquivalenceProperties> {
+    // TODO: In some cases, we should be able to preserve some equivalence
+    //       classes. Add support for such cases.
+    let mut init = eqps[0].clone();
+    // Harmonize the schema of the init with the schema of the union:
+    if !init.schema.eq(&schema) {
+        init = init.with_new_schema(schema)?;
+    }
+    eqps.into_iter()
+        .skip(1)
+        .try_fold(init, calculate_union_binary)
+}
+
 #[cfg(test)]
 mod tests {
     use std::ops::Not;
@@ -2188,50 +2309,6 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn test_get_meet_ordering() -> Result<()> {
-        let schema = create_test_schema()?;
-        let col_a = &col("a", &schema)?;
-        let col_b = &col("b", &schema)?;
-        let eq_properties = EquivalenceProperties::new(schema);
-        let option_asc = SortOptions {
-            descending: false,
-            nulls_first: false,
-        };
-        let option_desc = SortOptions {
-            descending: true,
-            nulls_first: true,
-        };
-        let tests_cases = vec![
-            // Get meet ordering between [a ASC] and [a ASC, b ASC]
-            // result should be [a ASC]
-            (
-                vec![(col_a, option_asc)],
-                vec![(col_a, option_asc), (col_b, option_asc)],
-                Some(vec![(col_a, option_asc)]),
-            ),
-            // Get meet ordering between [a ASC] and [a DESC]
-            // result should be None.
-            (vec![(col_a, option_asc)], vec![(col_a, option_desc)], None),
-            // Get meet ordering between [a ASC, b ASC] and [a ASC, b DESC]
-            // result should be [a ASC].
-            (
-                vec![(col_a, option_asc), (col_b, option_asc)],
-                vec![(col_a, option_asc), (col_b, option_desc)],
-                Some(vec![(col_a, option_asc)]),
-            ),
-        ];
-        for (lhs, rhs, expected) in tests_cases {
-            let lhs = convert_to_sort_exprs(&lhs);
-            let rhs = convert_to_sort_exprs(&rhs);
-            let expected = expected.map(|expected| 
convert_to_sort_exprs(&expected));
-            let finer = eq_properties.get_meet_ordering(&lhs, &rhs);
-            assert_eq!(finer, expected)
-        }
-
-        Ok(())
-    }
-
     #[test]
     fn test_get_finer() -> Result<()> {
         let schema = create_test_schema()?;
@@ -2525,4 +2602,422 @@ mod tests {
 
         Ok(())
     }
+
+    fn append_fields(schema: &SchemaRef, text: &str) -> SchemaRef {
+        Arc::new(Schema::new(
+            schema
+                .fields()
+                .iter()
+                .map(|field| {
+                    Field::new(
+                        // Annotate name with `text`:
+                        format!("{}{}", field.name(), text),
+                        field.data_type().clone(),
+                        field.is_nullable(),
+                    )
+                })
+                .collect::<Vec<_>>(),
+        ))
+    }
+
+    #[tokio::test]
+    async fn test_union_equivalence_properties_multi_children() -> Result<()> {
+        let schema = create_test_schema()?;
+        let schema2 = append_fields(&schema, "1");
+        let schema3 = append_fields(&schema, "2");
+        let test_cases = vec![
+            // --------- TEST CASE 1 ----------
+            (
+                vec![
+                    // Children 1
+                    (
+                        // Orderings
+                        vec![vec!["a", "b", "c"]],
+                        Arc::clone(&schema),
+                    ),
+                    // Children 2
+                    (
+                        // Orderings
+                        vec![vec!["a1", "b1", "c1"]],
+                        Arc::clone(&schema2),
+                    ),
+                    // Children 3
+                    (
+                        // Orderings
+                        vec![vec!["a2", "b2"]],
+                        Arc::clone(&schema3),
+                    ),
+                ],
+                // Expected
+                vec![vec!["a", "b"]],
+            ),
+            // --------- TEST CASE 2 ----------
+            (
+                vec![
+                    // Children 1
+                    (
+                        // Orderings
+                        vec![vec!["a", "b", "c"]],
+                        Arc::clone(&schema),
+                    ),
+                    // Children 2
+                    (
+                        // Orderings
+                        vec![vec!["a1", "b1", "c1"]],
+                        Arc::clone(&schema2),
+                    ),
+                    // Children 3
+                    (
+                        // Orderings
+                        vec![vec!["a2", "b2", "c2"]],
+                        Arc::clone(&schema3),
+                    ),
+                ],
+                // Expected
+                vec![vec!["a", "b", "c"]],
+            ),
+            // --------- TEST CASE 3 ----------
+            (
+                vec![
+                    // Children 1
+                    (
+                        // Orderings
+                        vec![vec!["a", "b"]],
+                        Arc::clone(&schema),
+                    ),
+                    // Children 2
+                    (
+                        // Orderings
+                        vec![vec!["a1", "b1", "c1"]],
+                        Arc::clone(&schema2),
+                    ),
+                    // Children 3
+                    (
+                        // Orderings
+                        vec![vec!["a2", "b2", "c2"]],
+                        Arc::clone(&schema3),
+                    ),
+                ],
+                // Expected
+                vec![vec!["a", "b"]],
+            ),
+            // --------- TEST CASE 4 ----------
+            (
+                vec![
+                    // Children 1
+                    (
+                        // Orderings
+                        vec![vec!["a", "b"]],
+                        Arc::clone(&schema),
+                    ),
+                    // Children 2
+                    (
+                        // Orderings
+                        vec![vec!["a1", "b1"]],
+                        Arc::clone(&schema2),
+                    ),
+                    // Children 3
+                    (
+                        // Orderings
+                        vec![vec!["b2", "c2"]],
+                        Arc::clone(&schema3),
+                    ),
+                ],
+                // Expected
+                vec![],
+            ),
+            // --------- TEST CASE 5 ----------
+            (
+                vec![
+                    // Children 1
+                    (
+                        // Orderings
+                        vec![vec!["a", "b"], vec!["c"]],
+                        Arc::clone(&schema),
+                    ),
+                    // Children 2
+                    (
+                        // Orderings
+                        vec![vec!["a1", "b1"], vec!["c1"]],
+                        Arc::clone(&schema2),
+                    ),
+                ],
+                // Expected
+                vec![vec!["a", "b"], vec!["c"]],
+            ),
+        ];
+        for (children, expected) in test_cases {
+            let children_eqs = children
+                .iter()
+                .map(|(orderings, schema)| {
+                    let orderings = orderings
+                        .iter()
+                        .map(|ordering| {
+                            ordering
+                                .iter()
+                                .map(|name| PhysicalSortExpr {
+                                    expr: col(name, schema).unwrap(),
+                                    options: SortOptions::default(),
+                                })
+                                .collect::<Vec<_>>()
+                        })
+                        .collect::<Vec<_>>();
+                    EquivalenceProperties::new_with_orderings(
+                        Arc::clone(schema),
+                        &orderings,
+                    )
+                })
+                .collect::<Vec<_>>();
+            let actual = calculate_union(children_eqs, Arc::clone(&schema))?;
+
+            let expected_ordering = expected
+                .into_iter()
+                .map(|ordering| {
+                    ordering
+                        .into_iter()
+                        .map(|name| PhysicalSortExpr {
+                            expr: col(name, &schema).unwrap(),
+                            options: SortOptions::default(),
+                        })
+                        .collect::<Vec<_>>()
+                })
+                .collect::<Vec<_>>();
+            let expected = EquivalenceProperties::new_with_orderings(
+                Arc::clone(&schema),
+                &expected_ordering,
+            );
+            assert_eq_properties_same(
+                &actual,
+                &expected,
+                format!("expected: {:?}, actual: {:?}", expected, actual),
+            );
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_union_equivalence_properties_binary() -> Result<()> {
+        let schema = create_test_schema()?;
+        let schema2 = append_fields(&schema, "1");
+        let col_a = &col("a", &schema)?;
+        let col_b = &col("b", &schema)?;
+        let col_c = &col("c", &schema)?;
+        let col_a1 = &col("a1", &schema2)?;
+        let col_b1 = &col("b1", &schema2)?;
+        let options = SortOptions::default();
+        let options_desc = !SortOptions::default();
+        let test_cases = [
+            //-----------TEST CASE 1----------//
+            (
+                (
+                    // First child orderings
+                    vec![
+                        // [a ASC]
+                        (vec![(col_a, options)]),
+                    ],
+                    // First child constants
+                    vec![col_b, col_c],
+                    Arc::clone(&schema),
+                ),
+                (
+                    // Second child orderings
+                    vec![
+                        // [b ASC]
+                        (vec![(col_b, options)]),
+                    ],
+                    // Second child constants
+                    vec![col_a, col_c],
+                    Arc::clone(&schema),
+                ),
+                (
+                    // Union expected orderings
+                    vec![
+                        // [a ASC]
+                        vec![(col_a, options)],
+                        // [b ASC]
+                        vec![(col_b, options)],
+                    ],
+                    // Union
+                    vec![col_c],
+                ),
+            ),
+            //-----------TEST CASE 2----------//
+            // Meet ordering between [a ASC], [a ASC, b ASC] should be [a ASC]
+            (
+                (
+                    // First child orderings
+                    vec![
+                        // [a ASC]
+                        vec![(col_a, options)],
+                    ],
+                    // No constant
+                    vec![],
+                    Arc::clone(&schema),
+                ),
+                (
+                    // Second child orderings
+                    vec![
+                        // [a ASC, b ASC]
+                        vec![(col_a, options), (col_b, options)],
+                    ],
+                    // No constant
+                    vec![],
+                    Arc::clone(&schema),
+                ),
+                (
+                    // Union orderings
+                    vec![
+                        // [a ASC]
+                        vec![(col_a, options)],
+                    ],
+                    // No constant
+                    vec![],
+                ),
+            ),
+            //-----------TEST CASE 3----------//
+            // Meet ordering between [a ASC], [a DESC] should be []
+            (
+                (
+                    // First child orderings
+                    vec![
+                        // [a ASC]
+                        vec![(col_a, options)],
+                    ],
+                    // No constant
+                    vec![],
+                    Arc::clone(&schema),
+                ),
+                (
+                    // Second child orderings
+                    vec![
+                        // [a DESC]
+                        vec![(col_a, options_desc)],
+                    ],
+                    // No constant
+                    vec![],
+                    Arc::clone(&schema),
+                ),
+                (
+                    // Union doesn't have any ordering
+                    vec![],
+                    // No constant
+                    vec![],
+                ),
+            ),
+            //-----------TEST CASE 4----------//
+            // Meet ordering between [a ASC], [a1 ASC, b1 ASC] should be [a 
ASC]
+            // Where a, and a1 ath the same index for their corresponding 
schemas.
+            (
+                (
+                    // First child orderings
+                    vec![
+                        // [a ASC]
+                        vec![(col_a, options)],
+                    ],
+                    // No constant
+                    vec![],
+                    Arc::clone(&schema),
+                ),
+                (
+                    // Second child orderings
+                    vec![
+                        // [a1 ASC, b1 ASC]
+                        vec![(col_a1, options), (col_b1, options)],
+                    ],
+                    // No constant
+                    vec![],
+                    Arc::clone(&schema2),
+                ),
+                (
+                    // Union orderings
+                    vec![
+                        // [a ASC]
+                        vec![(col_a, options)],
+                    ],
+                    // No constant
+                    vec![],
+                ),
+            ),
+        ];
+
+        for (
+            test_idx,
+            (
+                (first_child_orderings, first_child_constants, first_schema),
+                (second_child_orderings, second_child_constants, 
second_schema),
+                (union_orderings, union_constants),
+            ),
+        ) in test_cases.iter().enumerate()
+        {
+            let first_orderings = first_child_orderings
+                .iter()
+                .map(|ordering| convert_to_sort_exprs(ordering))
+                .collect::<Vec<_>>();
+            let first_constants = first_child_constants
+                .iter()
+                .map(|expr| ConstExpr::new(Arc::clone(expr)))
+                .collect::<Vec<_>>();
+            let mut lhs = EquivalenceProperties::new(Arc::clone(first_schema));
+            lhs = lhs.add_constants(first_constants);
+            lhs.add_new_orderings(first_orderings);
+
+            let second_orderings = second_child_orderings
+                .iter()
+                .map(|ordering| convert_to_sort_exprs(ordering))
+                .collect::<Vec<_>>();
+            let second_constants = second_child_constants
+                .iter()
+                .map(|expr| ConstExpr::new(Arc::clone(expr)))
+                .collect::<Vec<_>>();
+            let mut rhs = 
EquivalenceProperties::new(Arc::clone(second_schema));
+            rhs = rhs.add_constants(second_constants);
+            rhs.add_new_orderings(second_orderings);
+
+            let union_expected_orderings = union_orderings
+                .iter()
+                .map(|ordering| convert_to_sort_exprs(ordering))
+                .collect::<Vec<_>>();
+            let union_constants = union_constants
+                .iter()
+                .map(|expr| ConstExpr::new(Arc::clone(expr)))
+                .collect::<Vec<_>>();
+            let mut union_expected_eq = 
EquivalenceProperties::new(Arc::clone(&schema));
+            union_expected_eq = 
union_expected_eq.add_constants(union_constants);
+            union_expected_eq.add_new_orderings(union_expected_orderings);
+
+            let actual_union_eq = calculate_union_binary(lhs, rhs)?;
+            let err_msg = format!(
+                "Error in test id: {:?}, test case: {:?}",
+                test_idx, test_cases[test_idx]
+            );
+            assert_eq_properties_same(&actual_union_eq, &union_expected_eq, 
err_msg);
+        }
+        Ok(())
+    }
+
+    fn assert_eq_properties_same(
+        lhs: &EquivalenceProperties,
+        rhs: &EquivalenceProperties,
+        err_msg: String,
+    ) {
+        // Check whether constants are same
+        let lhs_constants = lhs.constants();
+        let rhs_constants = rhs.constants();
+        assert_eq!(lhs_constants.len(), rhs_constants.len(), "{}", err_msg);
+        for rhs_constant in rhs_constants {
+            assert!(
+                const_exprs_contains(lhs_constants, rhs_constant.expr()),
+                "{}",
+                err_msg
+            );
+        }
+
+        // Check whether orderings are same.
+        let lhs_orderings = lhs.oeq_class();
+        let rhs_orderings = &rhs.oeq_class.orderings;
+        assert_eq!(lhs_orderings.len(), rhs_orderings.len(), "{}", err_msg);
+        for rhs_ordering in rhs_orderings {
+            assert!(lhs_orderings.contains(rhs_ordering), "{}", err_msg);
+        }
+    }
 }
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index 4f83ae0195..2e78119eba 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -48,7 +48,7 @@ pub use analysis::{analyze, AnalysisContext, ExprBoundaries};
 pub use datafusion_physical_expr_common::aggregate::{
     AggregateExpr, AggregatePhysicalExpressions,
 };
-pub use equivalence::{ConstExpr, EquivalenceProperties};
+pub use equivalence::{calculate_union, ConstExpr, EquivalenceProperties};
 pub use partitioning::{Distribution, Partitioning};
 pub use physical_expr::{
     physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
diff --git a/datafusion/physical-plan/src/common.rs 
b/datafusion/physical-plan/src/common.rs
index bf9d14e73d..4b5eea6b76 100644
--- a/datafusion/physical-plan/src/common.rs
+++ b/datafusion/physical-plan/src/common.rs
@@ -22,9 +22,9 @@ use std::fs::{metadata, File};
 use std::path::{Path, PathBuf};
 use std::sync::Arc;
 
-use super::{ExecutionPlanProperties, SendableRecordBatchStream};
+use super::SendableRecordBatchStream;
 use crate::stream::RecordBatchReceiverStream;
-use crate::{ColumnStatistics, ExecutionPlan, Statistics};
+use crate::{ColumnStatistics, Statistics};
 
 use arrow::datatypes::Schema;
 use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
@@ -33,8 +33,6 @@ use arrow_array::Array;
 use datafusion_common::stats::Precision;
 use datafusion_common::{plan_err, DataFusionError, Result};
 use datafusion_execution::memory_pool::MemoryReservation;
-use datafusion_physical_expr::expressions::{BinaryExpr, Column};
-use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
 
 use futures::{StreamExt, TryStreamExt};
 use parking_lot::Mutex;
@@ -178,71 +176,6 @@ pub fn compute_record_batch_statistics(
     }
 }
 
-/// Calculates the "meet" of given orderings.
-/// The meet is the finest ordering that satisfied by all the given
-/// orderings, see <https://en.wikipedia.org/wiki/Join_and_meet>.
-pub fn get_meet_of_orderings(
-    given: &[Arc<dyn ExecutionPlan>],
-) -> Option<&[PhysicalSortExpr]> {
-    given
-        .iter()
-        .map(|item| item.output_ordering())
-        .collect::<Option<Vec<_>>>()
-        .and_then(get_meet_of_orderings_helper)
-}
-
-fn get_meet_of_orderings_helper(
-    orderings: Vec<&[PhysicalSortExpr]>,
-) -> Option<&[PhysicalSortExpr]> {
-    let mut idx = 0;
-    let first = orderings[0];
-    loop {
-        for ordering in orderings.iter() {
-            if idx >= ordering.len() {
-                return Some(ordering);
-            } else {
-                let schema_aligned = check_expr_alignment(
-                    ordering[idx].expr.as_ref(),
-                    first[idx].expr.as_ref(),
-                );
-                if !schema_aligned || (ordering[idx].options != 
first[idx].options) {
-                    // In a union, the output schema is that of the first 
child (by convention).
-                    // Therefore, generate the result from the first child's 
schema:
-                    return if idx > 0 { Some(&first[..idx]) } else { None };
-                }
-            }
-        }
-        idx += 1;
-    }
-
-    fn check_expr_alignment(first: &dyn PhysicalExpr, second: &dyn 
PhysicalExpr) -> bool {
-        match (
-            first.as_any().downcast_ref::<Column>(),
-            second.as_any().downcast_ref::<Column>(),
-            first.as_any().downcast_ref::<BinaryExpr>(),
-            second.as_any().downcast_ref::<BinaryExpr>(),
-        ) {
-            (Some(first_col), Some(second_col), _, _) => {
-                first_col.index() == second_col.index()
-            }
-            (_, _, Some(first_binary), Some(second_binary)) => {
-                if first_binary.op() == second_binary.op() {
-                    check_expr_alignment(
-                        first_binary.left().as_ref(),
-                        second_binary.left().as_ref(),
-                    ) && check_expr_alignment(
-                        first_binary.right().as_ref(),
-                        second_binary.right().as_ref(),
-                    )
-                } else {
-                    false
-                }
-            }
-            (_, _, _, _) => false,
-        }
-    }
-}
-
 /// Write in Arrow IPC format.
 pub struct IPCWriter {
     /// path
@@ -342,297 +275,12 @@ pub fn can_project(
 
 #[cfg(test)]
 mod tests {
-    use std::ops::Not;
-
     use super::*;
-    use crate::memory::MemoryExec;
-    use crate::sorts::sort::SortExec;
-    use crate::union::UnionExec;
 
-    use arrow::compute::SortOptions;
     use arrow::{
         array::{Float32Array, Float64Array, UInt64Array},
         datatypes::{DataType, Field},
     };
-    use datafusion_expr::Operator;
-    use datafusion_physical_expr::expressions::col;
-
-    #[test]
-    fn get_meet_of_orderings_helper_common_prefix_test() -> Result<()> {
-        let input1: Vec<PhysicalSortExpr> = vec![
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("a", 0)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("b", 1)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("c", 2)),
-                options: SortOptions::default(),
-            },
-        ];
-
-        let input2: Vec<PhysicalSortExpr> = vec![
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("x", 0)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("y", 1)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("z", 2)),
-                options: SortOptions::default(),
-            },
-        ];
-
-        let input3: Vec<PhysicalSortExpr> = vec![
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("d", 0)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("e", 1)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("f", 2)),
-                options: SortOptions::default(),
-            },
-        ];
-
-        let input4: Vec<PhysicalSortExpr> = vec![
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("g", 0)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("h", 1)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                // Note that index of this column is not 2. Hence this 3rd 
entry shouldn't be
-                // in the output ordering.
-                expr: Arc::new(Column::new("i", 3)),
-                options: SortOptions::default(),
-            },
-        ];
-
-        let expected = vec![
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("a", 0)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("b", 1)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("c", 2)),
-                options: SortOptions::default(),
-            },
-        ];
-        let result = get_meet_of_orderings_helper(vec![&input1, &input2, 
&input3]);
-        assert_eq!(result.unwrap(), expected);
-
-        let expected = vec![
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("a", 0)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("b", 1)),
-                options: SortOptions::default(),
-            },
-        ];
-        let result = get_meet_of_orderings_helper(vec![&input1, &input2, 
&input4]);
-        assert_eq!(result.unwrap(), expected);
-        Ok(())
-    }
-
-    #[test]
-    fn get_meet_of_orderings_helper_subset_test() -> Result<()> {
-        let input1: Vec<PhysicalSortExpr> = vec![
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("a", 0)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("b", 1)),
-                options: SortOptions::default(),
-            },
-        ];
-
-        let input2: Vec<PhysicalSortExpr> = vec![
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("c", 0)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("d", 1)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("e", 2)),
-                options: SortOptions::default(),
-            },
-        ];
-
-        let input3: Vec<PhysicalSortExpr> = vec![
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("f", 0)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("g", 1)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("h", 2)),
-                options: SortOptions::default(),
-            },
-        ];
-
-        let result = get_meet_of_orderings_helper(vec![&input1, &input2, 
&input3]);
-        assert_eq!(result.unwrap(), input1);
-        Ok(())
-    }
-
-    #[test]
-    fn get_meet_of_orderings_helper_no_overlap_test() -> Result<()> {
-        let input1: Vec<PhysicalSortExpr> = vec![
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("a", 0)),
-                // Since ordering is conflicting with other inputs
-                // output ordering should be empty
-                options: SortOptions::default().not(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("b", 1)),
-                options: SortOptions::default(),
-            },
-        ];
-
-        let input2: Vec<PhysicalSortExpr> = vec![
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("x", 0)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("a", 1)),
-                options: SortOptions::default(),
-            },
-        ];
-
-        let input3: Vec<PhysicalSortExpr> = vec![
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("a", 2)),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("y", 1)),
-                options: SortOptions::default(),
-            },
-        ];
-
-        let result = get_meet_of_orderings_helper(vec![&input1, &input2]);
-        assert!(result.is_none());
-
-        let result = get_meet_of_orderings_helper(vec![&input2, &input3]);
-        assert!(result.is_none());
-
-        let result = get_meet_of_orderings_helper(vec![&input1, &input3]);
-        assert!(result.is_none());
-        Ok(())
-    }
-
-    #[test]
-    fn get_meet_of_orderings_helper_binary_exprs() -> Result<()> {
-        let input1: Vec<PhysicalSortExpr> = vec![
-            PhysicalSortExpr {
-                expr: Arc::new(BinaryExpr::new(
-                    Arc::new(Column::new("a", 0)),
-                    Operator::Plus,
-                    Arc::new(Column::new("b", 1)),
-                )),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("c", 2)),
-                options: SortOptions::default(),
-            },
-        ];
-
-        let input2: Vec<PhysicalSortExpr> = vec![
-            PhysicalSortExpr {
-                expr: Arc::new(BinaryExpr::new(
-                    Arc::new(Column::new("x", 0)),
-                    Operator::Plus,
-                    Arc::new(Column::new("y", 1)),
-                )),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("z", 2)),
-                options: SortOptions::default(),
-            },
-        ];
-
-        // erroneous input
-        let input3: Vec<PhysicalSortExpr> = vec![
-            PhysicalSortExpr {
-                expr: Arc::new(BinaryExpr::new(
-                    Arc::new(Column::new("a", 1)),
-                    Operator::Plus,
-                    Arc::new(Column::new("b", 0)),
-                )),
-                options: SortOptions::default(),
-            },
-            PhysicalSortExpr {
-                expr: Arc::new(Column::new("c", 2)),
-                options: SortOptions::default(),
-            },
-        ];
-
-        let result = get_meet_of_orderings_helper(vec![&input1, &input2]);
-        assert_eq!(input1, result.unwrap());
-
-        let result = get_meet_of_orderings_helper(vec![&input2, &input3]);
-        assert!(result.is_none());
-
-        let result = get_meet_of_orderings_helper(vec![&input1, &input3]);
-        assert!(result.is_none());
-        Ok(())
-    }
-
-    #[test]
-    fn test_meet_of_orderings() -> Result<()> {
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("f32", DataType::Float32, false),
-            Field::new("f64", DataType::Float64, false),
-        ]));
-        let sort_expr = vec![PhysicalSortExpr {
-            expr: col("f32", &schema).unwrap(),
-            options: SortOptions::default(),
-        }];
-        let memory_exec =
-            Arc::new(MemoryExec::try_new(&[], Arc::clone(&schema), None)?) as 
_;
-        let sort_exec = Arc::new(SortExec::new(sort_expr.clone(), memory_exec))
-            as Arc<dyn ExecutionPlan>;
-        let memory_exec2 = Arc::new(MemoryExec::try_new(&[], schema, None)?) 
as _;
-        // memory_exec2 doesn't have output ordering
-        let union_exec = UnionExec::new(vec![Arc::clone(&sort_exec), 
memory_exec2]);
-        let res = get_meet_of_orderings(union_exec.inputs());
-        assert!(res.is_none());
-
-        let union_exec = UnionExec::new(vec![Arc::clone(&sort_exec), 
sort_exec]);
-        let res = get_meet_of_orderings(union_exec.inputs());
-        assert_eq!(res, Some(&sort_expr[..]));
-        Ok(())
-    }
 
     #[test]
     fn test_compute_record_batch_statistics_empty() -> Result<()> {
diff --git a/datafusion/physical-plan/src/union.rs 
b/datafusion/physical-plan/src/union.rs
index 24c80048ab..9321fdb2ca 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -41,7 +41,7 @@ use arrow::record_batch::RecordBatch;
 use datafusion_common::stats::Precision;
 use datafusion_common::{exec_err, internal_err, Result};
 use datafusion_execution::TaskContext;
-use datafusion_physical_expr::{ConstExpr, EquivalenceProperties};
+use datafusion_physical_expr::{calculate_union, EquivalenceProperties};
 
 use futures::Stream;
 use itertools::Itertools;
@@ -99,7 +99,12 @@ impl UnionExec {
     /// Create a new UnionExec
     pub fn new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Self {
         let schema = union_schema(&inputs);
-        let cache = Self::compute_properties(&inputs, schema);
+        // The schema of the inputs and the union schema is consistent when:
+        // - They have the same number of fields, and
+        // - Their fields have same types at the same indices.
+        // Here, we know that schemas are consistent and the call below can
+        // not return an error.
+        let cache = Self::compute_properties(&inputs, schema).unwrap();
         UnionExec {
             inputs,
             metrics: ExecutionPlanMetricsSet::new(),
@@ -116,13 +121,13 @@ impl UnionExec {
     fn compute_properties(
         inputs: &[Arc<dyn ExecutionPlan>],
         schema: SchemaRef,
-    ) -> PlanProperties {
+    ) -> Result<PlanProperties> {
         // Calculate equivalence properties:
-        let children_eqs = inputs
+        let children_eqps = inputs
             .iter()
-            .map(|child| child.equivalence_properties())
+            .map(|child| child.equivalence_properties().clone())
             .collect::<Vec<_>>();
-        let eq_properties = calculate_union_eq_properties(&children_eqs, 
schema);
+        let eq_properties = calculate_union(children_eqps, schema)?;
 
         // Calculate output partitioning; i.e. sum output partitions of the 
inputs.
         let num_partitions = inputs
@@ -134,71 +139,13 @@ impl UnionExec {
         // Determine execution mode:
         let mode = execution_mode_from_children(inputs.iter());
 
-        PlanProperties::new(eq_properties, output_partitioning, mode)
+        Ok(PlanProperties::new(
+            eq_properties,
+            output_partitioning,
+            mode,
+        ))
     }
 }
-/// Calculate `EquivalenceProperties` for `UnionExec` from the 
`EquivalenceProperties`
-/// of its children.
-fn calculate_union_eq_properties(
-    children_eqs: &[&EquivalenceProperties],
-    schema: SchemaRef,
-) -> EquivalenceProperties {
-    // Calculate equivalence properties:
-    // TODO: In some cases, we should be able to preserve some equivalence
-    //       classes and constants. Add support for such cases.
-    let mut eq_properties = EquivalenceProperties::new(schema);
-    // Use the ordering equivalence class of the first child as the seed:
-    let mut meets = children_eqs[0]
-        .oeq_class()
-        .iter()
-        .map(|item| item.to_vec())
-        .collect::<Vec<_>>();
-    // Iterate over all the children:
-    for child_eqs in &children_eqs[1..] {
-        // Compute meet orderings of the current meets and the new ordering
-        // equivalence class.
-        let mut idx = 0;
-        while idx < meets.len() {
-            // Find all the meets of `current_meet` with this child's 
orderings:
-            let valid_meets = 
child_eqs.oeq_class().iter().filter_map(|ordering| {
-                child_eqs.get_meet_ordering(ordering, &meets[idx])
-            });
-            // Use the longest of these meets as others are redundant:
-            if let Some(next_meet) = valid_meets.max_by_key(|m| m.len()) {
-                meets[idx] = next_meet;
-                idx += 1;
-            } else {
-                meets.swap_remove(idx);
-            }
-        }
-    }
-    // We know have all the valid orderings after union, remove redundant
-    // entries (implicitly) and return:
-    eq_properties.add_new_orderings(meets);
-
-    let mut meet_constants = children_eqs[0].constants().to_vec();
-    // Iterate over all the children:
-    for child_eqs in &children_eqs[1..] {
-        let constants = child_eqs.constants();
-        meet_constants = meet_constants
-            .into_iter()
-            .filter_map(|meet_constant| {
-                for const_expr in constants {
-                    if const_expr.expr().eq(meet_constant.expr()) {
-                        // TODO: Check whether constant expressions evaluates 
the same value or not for each partition
-                        let across_partitions = false;
-                        return Some(
-                            ConstExpr::from(meet_constant.owned_expr())
-                                .with_across_partitions(across_partitions),
-                        );
-                    }
-                }
-                None
-            })
-            .collect::<Vec<_>>();
-    }
-    eq_properties.add_constants(meet_constants)
-}
 
 impl DisplayAs for UnionExec {
     fn fmt_as(
@@ -639,8 +586,8 @@ mod tests {
 
     use arrow_schema::{DataType, SortOptions};
     use datafusion_common::ScalarValue;
-    use datafusion_physical_expr::expressions::col;
     use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
+    use datafusion_physical_expr_common::expressions::column::col;
 
     // Generate a schema which consists of 7 columns (a, b, c, d, e, f, g)
     fn create_test_schema() -> Result<SchemaRef> {
@@ -856,23 +803,31 @@ mod tests {
                     .with_sort_information(second_orderings),
             );
 
+            let mut union_expected_eq = 
EquivalenceProperties::new(Arc::clone(&schema));
+            union_expected_eq.add_new_orderings(union_expected_orderings);
+
             let union = UnionExec::new(vec![child1, child2]);
             let union_eq_properties = 
union.properties().equivalence_properties();
-            let union_actual_orderings = union_eq_properties.oeq_class();
             let err_msg = format!(
                 "Error in test id: {:?}, test case: {:?}",
                 test_idx, test_cases[test_idx]
             );
-            assert_eq!(
-                union_actual_orderings.len(),
-                union_expected_orderings.len(),
-                "{}",
-                err_msg
-            );
-            for expected in &union_expected_orderings {
-                assert!(union_actual_orderings.contains(expected), "{}", 
err_msg);
-            }
+            assert_eq_properties_same(union_eq_properties, &union_expected_eq, 
err_msg);
         }
         Ok(())
     }
+
+    fn assert_eq_properties_same(
+        lhs: &EquivalenceProperties,
+        rhs: &EquivalenceProperties,
+        err_msg: String,
+    ) {
+        // Check whether orderings are same.
+        let lhs_orderings = lhs.oeq_class();
+        let rhs_orderings = &rhs.oeq_class.orderings;
+        assert_eq!(lhs_orderings.len(), rhs_orderings.len(), "{}", err_msg);
+        for rhs_ordering in rhs_orderings {
+            assert!(lhs_orderings.contains(rhs_ordering), "{}", err_msg);
+        }
+    }
 }
diff --git a/datafusion/sqllogictest/test_files/order.slt 
b/datafusion/sqllogictest/test_files/order.slt
index 51de40fb19..1aeaf9b76d 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -1132,3 +1132,10 @@ physical_plan
 02)--ProjectionExec: expr=[CAST(inc_col@0 > desc_col@1 AS Int32) as c]
 03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
 04)------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, 
projection=[inc_col, desc_col], output_orderings=[[inc_col@0 ASC NULLS LAST], 
[desc_col@1 DESC]], has_header=true
+
+# Union a query with the actual data and one with a constant
+query I
+SELECT (SELECT c from ordered_table ORDER BY c LIMIT 1) UNION ALL (SELECT 23 
as c from ordered_table ORDER BY c LIMIT 1) ORDER BY c;
+----
+0
+23


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to