alamb commented on code in PR #5661:
URL: https://github.com/apache/arrow-datafusion/pull/5661#discussion_r1155137871
##########
datafusion/core/src/physical_plan/joins/sort_merge_join.rs:
##########
@@ -249,6 +249,17 @@ impl ExecutionPlan for SortMergeJoinExec {
self.output_ordering.as_deref()
}
+ fn maintains_input_order(&self) -> Vec<bool> {
+ match self.join_type {
+ JoinType::Inner => vec![true, true],
+ JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti =>
vec![true, false],
Review Comment:
the `false` here is because NULLs can be injected into the stream
arbitrarily, right, messing up the sort order?
##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -1669,25 +1842,378 @@ mod tests {
let sort2 = sort_exec(sort_exprs3.clone(), source2);
let union = union_exec(vec![sort1, sort2]);
- let physical_plan = window_exec("nullable_col", sort_exprs3, union);
+ let spm = sort_preserving_merge_exec(sort_exprs3.clone(), union);
+ let physical_plan = bounded_window_exec("nullable_col", sort_exprs3,
spm);
// The `WindowAggExec` gets its sorting from multiple children jointly.
// During the removal of `SortExec`s, it should be able to remove the
// corresponding SortExecs together. Also, the inputs of these
`SortExec`s
// are not necessarily the same to be able to remove them.
let expected_input = vec![
- "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type:
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }),
frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound:
CurrentRow }]",
- " UnionExec",
- " SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
+ "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow }]",
+ " SortPreservingMergeExec: [nullable_col@0 DESC NULLS LAST]",
+ " UnionExec",
+ " SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC],
projection=[nullable_col, non_nullable_col]",
+ " SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
+ ];
+ let expected_optimized = vec![
+ "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type:
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }),
frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound:
Following(NULL) }]",
+ " SortPreservingMergeExec: [nullable_col@0 ASC]",
+ " UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC],
projection=[nullable_col, non_nullable_col]",
- " SortExec: expr=[nullable_col@0 DESC NULLS LAST]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
];
+ assert_optimized!(expected_input, expected_optimized, physical_plan);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_window_multi_path_sort2() -> Result<()> {
+ let schema = create_test_schema()?;
+
+ let sort_exprs1 = vec![
+ sort_expr("nullable_col", &schema),
+ sort_expr("non_nullable_col", &schema),
+ ];
+ let sort_exprs2 = vec![sort_expr("nullable_col", &schema)];
+ let source1 = parquet_exec_sorted(&schema, sort_exprs2.clone());
+ let source2 = parquet_exec_sorted(&schema, sort_exprs2.clone());
+ let sort1 = sort_exec(sort_exprs1.clone(), source1);
+ let sort2 = sort_exec(sort_exprs1.clone(), source2);
+
+ let union = union_exec(vec![sort1, sort2]);
+ let spm = Arc::new(SortPreservingMergeExec::new(sort_exprs1, union))
as _;
+ let physical_plan = bounded_window_exec("nullable_col", sort_exprs2,
spm);
+
+ // The `WindowAggExec` can get its required sorting from the leaf
nodes directly.
+ // The unnecessary SortExecs should be removed
+ let expected_input = vec![
+ "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow }]",
+ " SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
+ " UnionExec",
+ " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
+ " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
+ ];
let expected_optimized = vec![
- "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type:
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }),
frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound:
Following(NULL) }]",
+ "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow }]",
+ " SortPreservingMergeExec: [nullable_col@0 ASC]",
+ " UnionExec",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
+ ];
+ assert_optimized!(expected_input, expected_optimized, physical_plan);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_union_inputs_different_sorted_with_limit() -> Result<()> {
+ let schema = create_test_schema()?;
+
+ let source1 = parquet_exec(&schema);
+ let sort_exprs1 = vec![
+ sort_expr("nullable_col", &schema),
+ sort_expr("non_nullable_col", &schema),
+ ];
+ let sort_exprs2 = vec![
+ sort_expr("nullable_col", &schema),
+ sort_expr_options(
+ "non_nullable_col",
+ &schema,
+ SortOptions {
+ descending: true,
+ nulls_first: false,
+ },
+ ),
+ ];
+ let sort_exprs3 = vec![sort_expr("nullable_col", &schema)];
+ let sort1 = sort_exec(sort_exprs1, source1.clone());
+
+ let sort2 = sort_exec(sort_exprs2, source1);
+ let limit = local_limit_exec(sort2);
+ let limit = global_limit_exec(limit);
+
+ let union = union_exec(vec![sort1, limit]);
+ let physical_plan = sort_preserving_merge_exec(sort_exprs3, union);
+
+ // Should not change the unnecessarily fine `SortExec`s because there
is `LimitExec`
+ let expected_input = vec![
+ "SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
- " ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC],
projection=[nullable_col, non_nullable_col]",
- " ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
+ " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
+ " GlobalLimitExec: skip=0, fetch=100",
+ " LocalLimitExec: fetch=100",
+ " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1
DESC NULLS LAST]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
+ ];
+ let expected_optimized = vec![
+ "SortPreservingMergeExec: [nullable_col@0 ASC]",
+ " UnionExec",
+ " SortExec: expr=[nullable_col@0 ASC]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
+ " GlobalLimitExec: skip=0, fetch=100",
+ " LocalLimitExec: fetch=100",
+ " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1
DESC NULLS LAST]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
+ ];
+ assert_optimized!(expected_input, expected_optimized, physical_plan);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_sort_merge_join_order_by_left() -> Result<()> {
+ let left_schema = create_test_schema()?;
+ let right_schema = create_test_schema2()?;
+
+ let left = parquet_exec(&left_schema);
+ let right = parquet_exec(&right_schema);
+
+ // Join on (nullable_col == col_a)
+ let join_on = vec![(
+ Column::new_with_schema("nullable_col", &left.schema()).unwrap(),
+ Column::new_with_schema("col_a", &right.schema()).unwrap(),
+ )];
+
+ let join_types = vec![
+ JoinType::Inner,
+ JoinType::Left,
+ JoinType::Right,
+ JoinType::Full,
+ JoinType::LeftSemi,
+ JoinType::LeftAnti,
+ ];
+ for join_type in join_types {
+ let join =
+ sort_merge_join_exec(left.clone(), right.clone(), &join_on,
&join_type);
+ let sort_exprs = vec![
+ sort_expr("nullable_col", &join.schema()),
+ sort_expr("non_nullable_col", &join.schema()),
+ ];
+ let physical_plan = sort_preserving_merge_exec(sort_exprs.clone(),
join);
+
+ let join_plan =
+ format!("SortMergeJoin: join_type={join_type}, on=[(Column {{
name: \"nullable_col\", index: 0 }}, Column {{ name: \"col_a\", index: 0 }})]");
+ let join_plan2 =
+ format!(" SortMergeJoin: join_type={join_type}, on=[(Column
{{ name: \"nullable_col\", index: 0 }}, Column {{ name: \"col_a\", index: 0
}})]");
+ let expected_input = vec![
+ "SortPreservingMergeExec: [nullable_col@0
ASC,non_nullable_col@1 ASC]",
+ join_plan2.as_str(),
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[col_a, col_b]",
+ ];
+ let expected_optimized = match join_type {
+ JoinType::Inner
+ | JoinType::Left
+ | JoinType::LeftSemi
+ | JoinType::LeftAnti => {
+ // can push down the sort requirements and save 1 SortExec
+ vec![
+ join_plan.as_str(),
+ " SortExec: expr=[nullable_col@0
ASC,non_nullable_col@1 ASC]",
+ " ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[nullable_col, non_nullable_col]",
+ " SortExec: expr=[col_a@0 ASC]",
+ " ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[col_a, col_b]",
+ ]
+ }
+ _ => {
+ // can not push down the sort requirements
+ vec![
+ "SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1
ASC]",
+ join_plan2.as_str(),
+ " SortExec: expr=[nullable_col@0 ASC]",
+ " ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[nullable_col, non_nullable_col]",
+ " SortExec: expr=[col_a@0 ASC]",
+ " ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[col_a, col_b]",
+ ]
+ }
+ };
+ assert_optimized!(expected_input, expected_optimized,
physical_plan);
+ }
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_sort_merge_join_order_by_right() -> Result<()> {
+ let left_schema = create_test_schema()?;
+ let right_schema = create_test_schema2()?;
+
+ let left = parquet_exec(&left_schema);
+ let right = parquet_exec(&right_schema);
+
+ // Join on (nullable_col == col_a)
+ let join_on = vec![(
+ Column::new_with_schema("nullable_col", &left.schema()).unwrap(),
+ Column::new_with_schema("col_a", &right.schema()).unwrap(),
+ )];
+
+ let join_types = vec![
+ JoinType::Inner,
+ JoinType::Left,
+ JoinType::Right,
+ JoinType::Full,
+ JoinType::RightAnti,
+ ];
+ for join_type in join_types {
+ let join =
+ sort_merge_join_exec(left.clone(), right.clone(), &join_on,
&join_type);
+ let sort_exprs = vec![
+ sort_expr("col_a", &join.schema()),
+ sort_expr("col_b", &join.schema()),
+ ];
+ let physical_plan = sort_preserving_merge_exec(sort_exprs, join);
+
+ let join_plan =
+ format!("SortMergeJoin: join_type={join_type}, on=[(Column {{
name: \"nullable_col\", index: 0 }}, Column {{ name: \"col_a\", index: 0 }})]");
+ let spm_plan = match join_type {
+ JoinType::RightAnti => {
+ "SortPreservingMergeExec: [col_a@0 ASC,col_b@1 ASC]"
+ }
+ _ => "SortPreservingMergeExec: [col_a@2 ASC,col_b@3 ASC]",
+ };
+ let join_plan2 =
+ format!(" SortMergeJoin: join_type={join_type}, on=[(Column
{{ name: \"nullable_col\", index: 0 }}, Column {{ name: \"col_a\", index: 0
}})]");
+ let expected_input = vec![
+ spm_plan,
+ join_plan2.as_str(),
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[col_a, col_b]",
+ ];
+ let expected_optimized = match join_type {
+ JoinType::Inner | JoinType::Right | JoinType::RightAnti => {
+ // can push down the sort requirements and save 1 SortExec
+ vec![
+ join_plan.as_str(),
+ " SortExec: expr=[nullable_col@0 ASC]",
+ " ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[nullable_col, non_nullable_col]",
+ " SortExec: expr=[col_a@0 ASC,col_b@1 ASC]",
+ " ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[col_a, col_b]",
+ ]
+ }
+ _ => {
+ // can not push down the sort requirements for Left and
Full join.
+ vec![
+ "SortExec: expr=[col_a@2 ASC,col_b@3 ASC]",
+ join_plan2.as_str(),
+ " SortExec: expr=[nullable_col@0 ASC]",
+ " ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[nullable_col, non_nullable_col]",
+ " SortExec: expr=[col_a@0 ASC]",
+ " ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[col_a, col_b]",
+ ]
+ }
+ };
+ assert_optimized!(expected_input, expected_optimized,
physical_plan);
+ }
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_sort_merge_join_complex_order_by() -> Result<()> {
+ let left_schema = create_test_schema()?;
+ let right_schema = create_test_schema2()?;
+
+ let left = parquet_exec(&left_schema);
+ let right = parquet_exec(&right_schema);
+
+ // Join on (nullable_col == col_a)
+ let join_on = vec![(
+ Column::new_with_schema("nullable_col", &left.schema()).unwrap(),
+ Column::new_with_schema("col_a", &right.schema()).unwrap(),
+ )];
+
+ let join = sort_merge_join_exec(left, right, &join_on,
&JoinType::Inner);
+
+ // order by (col_b, col_a)
+ let sort_exprs1 = vec![
+ sort_expr("col_b", &join.schema()),
+ sort_expr("col_a", &join.schema()),
+ ];
+ let physical_plan = sort_preserving_merge_exec(sort_exprs1,
join.clone());
+
+ let expected_input = vec![
+ "SortPreservingMergeExec: [col_b@3 ASC,col_a@2 ASC]",
+ " SortMergeJoin: join_type=Inner, on=[(Column { name:
\"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[col_a, col_b]",
+ ];
+
+ // can not push down the sort requirements, need to add SortExec
+ let expected_optimized = vec![
+ "SortExec: expr=[col_b@3 ASC,col_a@2 ASC]",
+ " SortMergeJoin: join_type=Inner, on=[(Column { name:
\"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
+ " SortExec: expr=[nullable_col@0 ASC]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
+ " SortExec: expr=[col_a@0 ASC]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[col_a, col_b]",
+ ];
+ assert_optimized!(expected_input, expected_optimized, physical_plan);
+
+ // order by (nullable_col, col_b, col_a)
+ let sort_exprs2 = vec![
+ sort_expr("nullable_col", &join.schema()),
+ sort_expr("col_b", &join.schema()),
+ sort_expr("col_a", &join.schema()),
+ ];
+ let physical_plan = sort_preserving_merge_exec(sort_exprs2, join);
+
+ let expected_input = vec![
+ "SortPreservingMergeExec: [nullable_col@0 ASC,col_b@3 ASC,col_a@2
ASC]",
+ " SortMergeJoin: join_type=Inner, on=[(Column { name:
\"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[col_a, col_b]",
+ ];
+
+ // can not push down the sort requirements, need to add SortExec
+ let expected_optimized = vec![
+ "SortExec: expr=[nullable_col@0 ASC,col_b@3 ASC,col_a@2 ASC]",
+ " SortMergeJoin: join_type=Inner, on=[(Column { name:
\"nullable_col\", index: 0 }, Column { name: \"col_a\", index: 0 })]",
+ " SortExec: expr=[nullable_col@0 ASC]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
+ " SortExec: expr=[col_a@0 ASC]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[col_a, col_b]",
+ ];
+ assert_optimized!(expected_input, expected_optimized, physical_plan);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_multiple_sort_window_exec() -> Result<()> {
+ let schema = create_test_schema()?;
+ let source = memory_exec(&schema);
+
+ let sort_exprs1 = vec![sort_expr("nullable_col", &schema)];
+ let sort_exprs2 = vec![
+ sort_expr("nullable_col", &schema),
+ sort_expr("non_nullable_col", &schema),
+ ];
+
+ let sort1 = sort_exec(sort_exprs1.clone(), source);
+ let window_agg1 =
+ bounded_window_exec("non_nullable_col", sort_exprs1.clone(),
sort1);
+ let window_agg2 =
+ bounded_window_exec("non_nullable_col", sort_exprs2, window_agg1);
+ // let filter_exec = sort_exec;
+ let physical_plan =
+ bounded_window_exec("non_nullable_col", sort_exprs1, window_agg2);
+
+ let expected_input = vec![
Review Comment:
All of these tests must have taken substantial amounts of time to write and
updated the expected output.
One thing we started doing in IOx was to use https://insta.rs/ (where
basically you can do `cargo insta review` and the tool will update the inlined
expected output for you). I don't have time to lead another refactoring project
now (I am still trying to finish up sqllogictests) but using insta might help
accelerate velocity
##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -146,19 +151,11 @@ impl PlanWithCorrespondingSort {
return None;
}
let is_spm = is_sort_preserving_merge(plan);
- let is_union = plan.as_any().is::<UnionExec>();
Review Comment:
I like the idea of separting the "enforce sort requirements" from the
"optimize sort requirements" which I think this PR improves
##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -1432,22 +1514,22 @@ mod tests {
let physical_plan = sort_preserving_merge_exec(sort_exprs, union);
// Input is an invalid plan. In this case rule should add required
sorting in appropriate places.
- // First ParquetExec has output ordering(nullable_col@0 ASC). However,
it doesn't satisfy required ordering
- // of SortPreservingMergeExec. Hence rule should remove unnecessary
sort for second child of the UnionExec
- // and put a sort above Union to satisfy required ordering.
+ // First ParquetExec has output ordering(nullable_col@0 ASC). However,
it doesn't satisfy the
+ // required ordering of SortPreservingMergeExec.
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
];
- // should remove unnecessary sorting from below and move it to top
+
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
- " SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " UnionExec",
+ " UnionExec",
Review Comment:
This plan seems to have gone "in reverse" (as in the test says that the sort
should be removed) -- is that ok?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]