ozankabak commented on code in PR #5661:
URL: https://github.com/apache/arrow-datafusion/pull/5661#discussion_r1155147305


##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -1669,25 +1842,378 @@ mod tests {
         let sort2 = sort_exec(sort_exprs3.clone(), source2);
 
         let union = union_exec(vec![sort1, sort2]);
-        let physical_plan = window_exec("nullable_col", sort_exprs3, union);
+        let spm = sort_preserving_merge_exec(sort_exprs3.clone(), union);
+        let physical_plan = bounded_window_exec("nullable_col", sort_exprs3, 
spm);
 
         // The `WindowAggExec` gets its sorting from multiple children jointly.
         // During the removal of `SortExec`s, it should be able to remove the
         // corresponding SortExecs together. Also, the inputs of these 
`SortExec`s
         // are not necessarily the same to be able to remove them.
         let expected_input = vec![
-            "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: 
CurrentRow }]",
-            "  UnionExec",
-            "    SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
+            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
+            "  SortPreservingMergeExec: [nullable_col@0 DESC NULLS LAST]",
+            "    UnionExec",
+            "      SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], 
projection=[nullable_col, non_nullable_col]",
+            "      SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+        ];
+        let expected_optimized = vec![
+            "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: 
Following(NULL) }]",
+            "  SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "    UnionExec",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], 
projection=[nullable_col, non_nullable_col]",
-            "    SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
             "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
         ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_window_multi_path_sort2() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let sort_exprs1 = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort_exprs2 = vec![sort_expr("nullable_col", &schema)];
+        let source1 = parquet_exec_sorted(&schema, sort_exprs2.clone());
+        let source2 = parquet_exec_sorted(&schema, sort_exprs2.clone());
+        let sort1 = sort_exec(sort_exprs1.clone(), source1);
+        let sort2 = sort_exec(sort_exprs1.clone(), source2);
+
+        let union = union_exec(vec![sort1, sort2]);
+        let spm = Arc::new(SortPreservingMergeExec::new(sort_exprs1, union)) 
as _;
+        let physical_plan = bounded_window_exec("nullable_col", sort_exprs2, 
spm);
+
+        // The `WindowAggExec` can get its required sorting from the leaf 
nodes directly.
+        // The unnecessary SortExecs should be removed
+        let expected_input = vec![
+            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
+            "  SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
+            "    UnionExec",
+            "      SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+            "      SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "        ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+        ];
         let expected_optimized = vec![
-            "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: 
Following(NULL) }]",
+            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }]",
+            "  SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "    UnionExec",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_union_inputs_different_sorted_with_limit() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let source1 = parquet_exec(&schema);
+        let sort_exprs1 = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+        let sort_exprs2 = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr_options(
+                "non_nullable_col",
+                &schema,
+                SortOptions {
+                    descending: true,
+                    nulls_first: false,
+                },
+            ),
+        ];
+        let sort_exprs3 = vec![sort_expr("nullable_col", &schema)];
+        let sort1 = sort_exec(sort_exprs1, source1.clone());
+
+        let sort2 = sort_exec(sort_exprs2, source1);
+        let limit = local_limit_exec(sort2);
+        let limit = global_limit_exec(limit);
+
+        let union = union_exec(vec![sort1, limit]);
+        let physical_plan = sort_preserving_merge_exec(sort_exprs3, union);
+
+        // Should not change the unnecessarily fine `SortExec`s because there 
is `LimitExec`
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  UnionExec",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], 
projection=[nullable_col, non_nullable_col]",
-            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
output_ordering=[nullable_col@0 ASC], projection=[nullable_col, 
non_nullable_col]",
+            "    SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    GlobalLimitExec: skip=0, fetch=100",
+            "      LocalLimitExec: fetch=100",
+            "        SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 
DESC NULLS LAST]",
+            "          ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC]",
+            "  UnionExec",
+            "    SortExec: expr=[nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    GlobalLimitExec: skip=0, fetch=100",
+            "      LocalLimitExec: fetch=100",
+            "        SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 
DESC NULLS LAST]",
+            "          ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_sort_merge_join_order_by_left() -> Result<()> {
+        let left_schema = create_test_schema()?;
+        let right_schema = create_test_schema2()?;
+
+        let left = parquet_exec(&left_schema);
+        let right = parquet_exec(&right_schema);
+
+        // Join on (nullable_col == col_a)
+        let join_on = vec![(
+            Column::new_with_schema("nullable_col", &left.schema()).unwrap(),
+            Column::new_with_schema("col_a", &right.schema()).unwrap(),
+        )];
+
+        let join_types = vec![
+            JoinType::Inner,
+            JoinType::Left,
+            JoinType::Right,
+            JoinType::Full,
+            JoinType::LeftSemi,
+            JoinType::LeftAnti,
+        ];
+        for join_type in join_types {
+            let join =
+                sort_merge_join_exec(left.clone(), right.clone(), &join_on, 
&join_type);
+            let sort_exprs = vec![
+                sort_expr("nullable_col", &join.schema()),
+                sort_expr("non_nullable_col", &join.schema()),
+            ];
+            let physical_plan = sort_preserving_merge_exec(sort_exprs.clone(), 
join);
+
+            let join_plan =
+                format!("SortMergeJoin: join_type={join_type}, on=[(Column {{ 
name: \"nullable_col\", index: 0 }}, Column {{ name: \"col_a\", index: 0 }})]");
+            let join_plan2 =
+                format!("  SortMergeJoin: join_type={join_type}, on=[(Column 
{{ name: \"nullable_col\", index: 0 }}, Column {{ name: \"col_a\", index: 0 
}})]");
+            let expected_input = vec![
+                "SortPreservingMergeExec: [nullable_col@0 
ASC,non_nullable_col@1 ASC]",
+                join_plan2.as_str(),
+                "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+                "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[col_a, col_b]",
+            ];
+            let expected_optimized = match join_type {
+                JoinType::Inner
+                | JoinType::Left
+                | JoinType::LeftSemi
+                | JoinType::LeftAnti => {
+                    // can push down the sort requirements and save 1 SortExec
+                    vec![
+                        join_plan.as_str(),
+                        "  SortExec: expr=[nullable_col@0 
ASC,non_nullable_col@1 ASC]",
+                        "    ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[nullable_col, non_nullable_col]",
+                        "  SortExec: expr=[col_a@0 ASC]",
+                        "    ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[col_a, col_b]",
+                    ]
+                }
+                _ => {
+                    // can not push down the sort requirements
+                    vec![
+                        "SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 
ASC]",
+                        join_plan2.as_str(),
+                        "    SortExec: expr=[nullable_col@0 ASC]",
+                        "      ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[nullable_col, non_nullable_col]",
+                        "    SortExec: expr=[col_a@0 ASC]",
+                        "      ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[col_a, col_b]",
+                    ]
+                }
+            };
+            assert_optimized!(expected_input, expected_optimized, 
physical_plan);
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_sort_merge_join_order_by_right() -> Result<()> {
+        let left_schema = create_test_schema()?;
+        let right_schema = create_test_schema2()?;
+
+        let left = parquet_exec(&left_schema);
+        let right = parquet_exec(&right_schema);
+
+        // Join on (nullable_col == col_a)
+        let join_on = vec![(
+            Column::new_with_schema("nullable_col", &left.schema()).unwrap(),
+            Column::new_with_schema("col_a", &right.schema()).unwrap(),
+        )];
+
+        let join_types = vec![
+            JoinType::Inner,
+            JoinType::Left,
+            JoinType::Right,
+            JoinType::Full,
+            JoinType::RightAnti,
+        ];
+        for join_type in join_types {
+            let join =
+                sort_merge_join_exec(left.clone(), right.clone(), &join_on, 
&join_type);
+            let sort_exprs = vec![
+                sort_expr("col_a", &join.schema()),
+                sort_expr("col_b", &join.schema()),
+            ];
+            let physical_plan = sort_preserving_merge_exec(sort_exprs, join);
+
+            let join_plan =
+                format!("SortMergeJoin: join_type={join_type}, on=[(Column {{ 
name: \"nullable_col\", index: 0 }}, Column {{ name: \"col_a\", index: 0 }})]");
+            let spm_plan = match join_type {
+                JoinType::RightAnti => {
+                    "SortPreservingMergeExec: [col_a@0 ASC,col_b@1 ASC]"
+                }
+                _ => "SortPreservingMergeExec: [col_a@2 ASC,col_b@3 ASC]",
+            };
+            let join_plan2 =
+                format!("  SortMergeJoin: join_type={join_type}, on=[(Column 
{{ name: \"nullable_col\", index: 0 }}, Column {{ name: \"col_a\", index: 0 
}})]");
+            let expected_input = vec![
+                spm_plan,
+                join_plan2.as_str(),
+                "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+                "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[col_a, col_b]",
+            ];
+            let expected_optimized = match join_type {
+                JoinType::Inner | JoinType::Right | JoinType::RightAnti => {
+                    // can push down the sort requirements and save 1 SortExec
+                    vec![
+                        join_plan.as_str(),
+                        "  SortExec: expr=[nullable_col@0 ASC]",
+                        "    ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[nullable_col, non_nullable_col]",
+                        "  SortExec: expr=[col_a@0 ASC,col_b@1 ASC]",
+                        "    ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[col_a, col_b]",
+                    ]
+                }
+                _ => {
+                    // can not push down the sort requirements for Left and 
Full join.
+                    vec![
+                        "SortExec: expr=[col_a@2 ASC,col_b@3 ASC]",
+                        join_plan2.as_str(),
+                        "    SortExec: expr=[nullable_col@0 ASC]",
+                        "      ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[nullable_col, non_nullable_col]",
+                        "    SortExec: expr=[col_a@0 ASC]",
+                        "      ParquetExec: limit=None, partitions={1 group: 
[[x]]}, projection=[col_a, col_b]",
+                    ]
+                }
+            };
+            assert_optimized!(expected_input, expected_optimized, 
physical_plan);
+        }
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_sort_merge_join_complex_order_by() -> Result<()> {
+        let left_schema = create_test_schema()?;
+        let right_schema = create_test_schema2()?;
+
+        let left = parquet_exec(&left_schema);
+        let right = parquet_exec(&right_schema);
+
+        // Join on (nullable_col == col_a)
+        let join_on = vec![(
+            Column::new_with_schema("nullable_col", &left.schema()).unwrap(),
+            Column::new_with_schema("col_a", &right.schema()).unwrap(),
+        )];
+
+        let join = sort_merge_join_exec(left, right, &join_on, 
&JoinType::Inner);
+
+        // order by (col_b, col_a)
+        let sort_exprs1 = vec![
+            sort_expr("col_b", &join.schema()),
+            sort_expr("col_a", &join.schema()),
+        ];
+        let physical_plan = sort_preserving_merge_exec(sort_exprs1, 
join.clone());
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [col_b@3 ASC,col_a@2 ASC]",
+            "  SortMergeJoin: join_type=Inner, on=[(Column { name: 
\"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[col_a, col_b]",
+        ];
+
+        // can not push down the sort requirements, need to add SortExec
+        let expected_optimized = vec![
+            "SortExec: expr=[col_b@3 ASC,col_a@2 ASC]",
+            "  SortMergeJoin: join_type=Inner, on=[(Column { name: 
\"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    SortExec: expr=[col_a@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[col_a, col_b]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+
+        // order by (nullable_col, col_b, col_a)
+        let sort_exprs2 = vec![
+            sort_expr("nullable_col", &join.schema()),
+            sort_expr("col_b", &join.schema()),
+            sort_expr("col_a", &join.schema()),
+        ];
+        let physical_plan = sort_preserving_merge_exec(sort_exprs2, join);
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [nullable_col@0 ASC,col_b@3 ASC,col_a@2 
ASC]",
+            "  SortMergeJoin: join_type=Inner, on=[(Column { name: 
\"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[col_a, col_b]",
+        ];
+
+        // can not push down the sort requirements, need to add SortExec
+        let expected_optimized = vec![
+            "SortExec: expr=[nullable_col@0 ASC,col_b@3 ASC,col_a@2 ASC]",
+            "  SortMergeJoin: join_type=Inner, on=[(Column { name: 
\"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
+            "    SortExec: expr=[nullable_col@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col]",
+            "    SortExec: expr=[col_a@0 ASC]",
+            "      ParquetExec: limit=None, partitions={1 group: [[x]]}, 
projection=[col_a, col_b]",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_multiple_sort_window_exec() -> Result<()> {
+        let schema = create_test_schema()?;
+        let source = memory_exec(&schema);
+
+        let sort_exprs1 = vec![sort_expr("nullable_col", &schema)];
+        let sort_exprs2 = vec![
+            sort_expr("nullable_col", &schema),
+            sort_expr("non_nullable_col", &schema),
+        ];
+
+        let sort1 = sort_exec(sort_exprs1.clone(), source);
+        let window_agg1 =
+            bounded_window_exec("non_nullable_col", sort_exprs1.clone(), 
sort1);
+        let window_agg2 =
+            bounded_window_exec("non_nullable_col", sort_exprs2, window_agg1);
+        // let filter_exec = sort_exec;
+        let physical_plan =
+            bounded_window_exec("non_nullable_col", sort_exprs1, window_agg2);
+
+        let expected_input = vec![

Review Comment:
   Thanks for the pointer, we will take a look at it 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to