alamb commented on code in PR #5035:
URL: https://github.com/apache/arrow-datafusion/pull/5035#discussion_r1085246895


##########
datafusion/core/src/physical_plan/union.rs:
##########
@@ -220,55 +220,55 @@ impl ExecutionPlan for UnionExec {
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        let first_input_ordering = self.inputs[0].output_ordering();
-        // If the Union is not partition aware and all the input ordering spec 
strictly equal with the first_input_ordering
-        // Return the first_input_ordering as the output_ordering
-        //
-        // It might be too strict here in the case that the input ordering are 
compatible but not exactly the same.
-        // For example one input ordering has the ordering spec 
SortExpr('a','b','c') and the other has the ordering
-        // spec SortExpr('a'), It is safe to derive the out ordering with the 
spec SortExpr('a').
-        if !self.partition_aware
-            && first_input_ordering.is_some()
-            && self
-                .inputs
-                .iter()
-                .map(|plan| plan.output_ordering())
-                .all(|ordering| {
-                    ordering.is_some()
-                        && sort_expr_list_eq_strict_order(
-                            ordering.unwrap(),
-                            first_input_ordering.unwrap(),
-                        )
-                })
-        {
-            first_input_ordering
+        // If the Union is partition aware, there is no output ordering.
+        // Otherwise, the output ordering is the "meet" of its input orderings.
+        // The meet is the finest ordering that satisfied by all the input
+        // orderings, see https://en.wikipedia.org/wiki/Join_and_meet.
+        if self.partition_aware {
+            return None;
+        }
+        // To find the meet, we first find the smallest input ordering.
+        let mut smallest: Option<&[PhysicalSortExpr]> = None;

Review Comment:
   This is awesome -- thank you @mustafasrepo . We don't have to make this more 
sophisticated yet, but I think longer term we should be planning on it. This 
algorithm is key to sort based optimizations (not just Union)
   
   Thus I suggest:
   1. Pull the code to find "the covering sort key" for a set of sort keys into 
its own function -- perhaps also in datafusion/physical-expr/src/utils.rs
   2. Add some basic unit tests (that we can expand as this logic gets more 
sophisticated)
   
   I also vaguely remember code like this to find "compatible" orderings as 
part of the initial sort / partition enforcement -- perhaps @mingmwang  
remembers?
   
   We also have a version of this code in IOx I think: 
https://github.com/influxdata/influxdb_iox/blob/bcb1232c5d2810fee16533c0d23c33e0f4bc2493/schema/src/sort.rs#L232-L297
   
   



##########
datafusion/core/src/physical_plan/union.rs:
##########
@@ -220,55 +220,55 @@ impl ExecutionPlan for UnionExec {
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        let first_input_ordering = self.inputs[0].output_ordering();
-        // If the Union is not partition aware and all the input ordering spec 
strictly equal with the first_input_ordering
-        // Return the first_input_ordering as the output_ordering
-        //
-        // It might be too strict here in the case that the input ordering are 
compatible but not exactly the same.
-        // For example one input ordering has the ordering spec 
SortExpr('a','b','c') and the other has the ordering
-        // spec SortExpr('a'), It is safe to derive the out ordering with the 
spec SortExpr('a').
-        if !self.partition_aware
-            && first_input_ordering.is_some()
-            && self
-                .inputs
-                .iter()
-                .map(|plan| plan.output_ordering())
-                .all(|ordering| {
-                    ordering.is_some()
-                        && sort_expr_list_eq_strict_order(
-                            ordering.unwrap(),
-                            first_input_ordering.unwrap(),
-                        )
-                })
-        {
-            first_input_ordering
+        // If the Union is partition aware, there is no output ordering.
+        // Otherwise, the output ordering is the "meet" of its input orderings.
+        // The meet is the finest ordering that satisfied by all the input
+        // orderings, see https://en.wikipedia.org/wiki/Join_and_meet.
+        if self.partition_aware {
+            return None;
+        }
+        // To find the meet, we first find the smallest input ordering.
+        let mut smallest: Option<&[PhysicalSortExpr]> = None;
+        for item in self.inputs.iter() {
+            if let Some(ordering) = item.output_ordering() {
+                smallest = match smallest {
+                    None => Some(ordering),
+                    Some(expr) if ordering.len() < expr.len() => 
Some(ordering),
+                    _ => continue,
+                }
+            } else {
+                return None;
+            }
+        }
+        // Check if the smallest ordering is a meet or not:
+        if self.inputs.iter().all(|child| {
+            ordering_satisfy(child.output_ordering(), smallest, || {
+                child.equivalence_properties()
+            })
+        }) {
+            smallest
         } else {
             None
         }
     }
 
-    fn maintains_input_order(&self) -> bool {
-        let first_input_ordering = self.inputs[0].output_ordering();
-        // If the Union is not partition aware and all the input
-        // ordering spec strictly equal with the first_input_ordering,
-        // then the `UnionExec` maintains the input order
-        //
-        // It might be too strict here in the case that the input
-        // ordering are compatible but not exactly the same.  See
-        // comments in output_ordering
-        !self.partition_aware
-            && first_input_ordering.is_some()
-            && self
-                .inputs
-                .iter()
-                .map(|plan| plan.output_ordering())
-                .all(|ordering| {
-                    ordering.is_some()
-                        && sort_expr_list_eq_strict_order(
-                            ordering.unwrap(),
-                            first_input_ordering.unwrap(),
-                        )
+    fn maintains_input_order(&self) -> Vec<bool> {
+        // If the Union has an output ordering, it maintains at least one
+        // child's ordering (i.e. the meet).
+        // For instance, assume that the first child is SortExpr('a','b','c'),

Review Comment:
   ❤️ 



##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -816,6 +817,74 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_union_inputs_different_sorted() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let sort = sort_exec(sort_exprs.clone(), source1);
+
+        let parquet_sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
+
+        let union = union_exec(vec![source2, sort]);
+        let physical_plan = sort_preserving_merge_exec(sort_exprs, union);
+
+        // one input to the union is already sorted, one is not.
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  UnionExec",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], 
projection=[nullable_col, non_nullable_col]",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        // should not add a sort at the output of the union, input plan should 
not be changed
+        let expected_optimized = expected_input.clone();
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_union_inputs_different_sorted2() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort = sort_exec(sort_exprs.clone(), source1);
+
+        let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
+
+        let union = union_exec(vec![source2, sort]);
+        let physical_plan = sort_preserving_merge_exec(sort_exprs, union);
+
+        // one input to the union is already sorted, one is not.
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
+            "  UnionExec",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+            "    SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        // should remove unnecessary sorting from below and move it to top
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
+            "  SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",

Review Comment:
   I don't think this is a better plan -- specifically now *both* files are 
sorted (after unioning them together) where in the input only *one* file was 
sorted



##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -816,6 +817,74 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_union_inputs_different_sorted() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let sort = sort_exec(sort_exprs.clone(), source1);
+
+        let parquet_sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
+
+        let union = union_exec(vec![source2, sort]);
+        let physical_plan = sort_preserving_merge_exec(sort_exprs, union);
+
+        // one input to the union is already sorted, one is not.
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  UnionExec",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], 
projection=[nullable_col, non_nullable_col]",
+            "    SortExec: [nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        // should not add a sort at the output of the union, input plan should 
not be changed
+        let expected_optimized = expected_input.clone();
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_union_inputs_different_sorted2() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort = sort_exec(sort_exprs.clone(), source1);
+
+        let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
+        let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
+
+        let union = union_exec(vec![source2, sort]);
+        let physical_plan = sort_preserving_merge_exec(sort_exprs, union);
+
+        // one input to the union is already sorted, one is not.

Review Comment:
   ```suggestion
           // both inputs to the union are already sorted
   ```



##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -108,13 +107,18 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
             let sort_onwards = children_requirements
                 .iter()
                 .map(|item| {
-                    if item.sort_onwards.is_empty() {
-                        vec![]
-                    } else {
-                        // TODO: When `maintains_input_order` returns 
Vec<bool>,
-                        //       pass the order-enforcing sort upwards.
-                        item.sort_onwards[0].clone()
+                    let onwards = &item.sort_onwards;
+                    if !onwards.is_empty() {
+                        let is_sort = item.plan.as_any().is::<SortExec>();

Review Comment:
   why do we need to check for `is_sort`?



##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -108,13 +107,18 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
             let sort_onwards = children_requirements
                 .iter()
                 .map(|item| {
-                    if item.sort_onwards.is_empty() {
-                        vec![]
-                    } else {
-                        // TODO: When `maintains_input_order` returns 
Vec<bool>,

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to