mingmwang commented on code in PR #5035:
URL: https://github.com/apache/arrow-datafusion/pull/5035#discussion_r1090214417
##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -816,6 +817,74 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_union_inputs_different_sorted() -> Result<()> {
+ let schema = create_test_schema()?;
+
+ let source1 = parquet_exec(&schema);
+ let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+ let sort = sort_exec(sort_exprs.clone(), source1);
+
+ let parquet_sort_exprs = vec![
+ sort_expr("nullable_col", &schema),
+ sort_expr("non_nullable_col", &schema),
+ ];
+ let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
+
+ let union = union_exec(vec![source2, sort]);
+ let physical_plan = sort_preserving_merge_exec(sort_exprs, union);
+
+ // one input to the union is already sorted, one is not.
+ let expected_input = vec![
+ "SortPreservingMergeExec: [nullable_col@0 ASC]",
+ " UnionExec",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC],
projection=[nullable_col, non_nullable_col]",
+ " SortExec: [nullable_col@0 ASC]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
+ ];
+ // should not add a sort at the output of the union, input plan should
not be changed
+ let expected_optimized = expected_input.clone();
+ assert_optimized!(expected_input, expected_optimized, physical_plan);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_union_inputs_different_sorted2() -> Result<()> {
+ let schema = create_test_schema()?;
+
+ let source1 = parquet_exec(&schema);
+ let sort_exprs = vec![
+ sort_expr("nullable_col", &schema),
+ sort_expr("non_nullable_col", &schema),
+ ];
+ let sort = sort_exec(sort_exprs.clone(), source1);
+
+ let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
+ let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
+
+ let union = union_exec(vec![source2, sort]);
+ let physical_plan = sort_preserving_merge_exec(sort_exprs, union);
+
+ // one input to the union is already sorted, one is not.
+ let expected_input = vec![
+ "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
+ " UnionExec",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
+ " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
+ ];
+ // should remove unnecessary sorting from below and move it to top
+ let expected_optimized = vec![
+ "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
+ " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
Review Comment:
I think the current implement sometimes can not give an optimal plan.
For example:
```rust
#[tokio::test]
async fn test_union_inputs_different_sorted3() -> Result<()> {
let schema = create_test_schema()?;
let source1 = parquet_exec(&schema);
let sort_exprs1 = vec![
sort_expr("nullable_col", &schema),
];
let sort_exprs2 = vec![
sort_expr("nullable_col", &schema),
sort_expr("non_nullable_col", &schema),
];
let sort = sort_exec(sort_exprs1.clone(), source1);
let source2 = parquet_exec_sorted(&schema, sort_exprs2.clone());
let union = union_exec(vec![source2, sort]);
let physical_plan = sort_preserving_merge_exec(sort_exprs2, union);
// Input is an invalid plan. In this case rule should add required
sorting in appropriate places.
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC],
projection=[nullable_col, non_nullable_col]",
" SortExec: [nullable_col@0 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
];
// expect to replace the wrong SortExec with the correct one
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
" SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
```
Instead of removing the original wrong `SortExec` and adding a new
`SortExec` on top of the UnionExec, the optimal plan should expect to replace
the wrong `SortExec` with a new one.
And PostgreSQL 13 added an optimization called Incremental sort, we might
add such optimization in future also. The Sort adding/removing logic need to be
revisited then.
https://postgrespro.com/blog/pgsql/5969770#:~:text=This%20method%20is%20called%20incremental,whole%20set%20to%20be%20processed.
Considering the current EnforceSorting is already very complex, I think one
of the reason is because the optimization process is a post-order
traversal(Bottom-Up), the Bottom-Up approach is usually hard to give an optimal
plan,
because of lack of enough context information. Specifically, the rule try to
add/remove Sort in one traversal pass, it is quite hard and complex. I will try
to do some refactoring and reimplement the rule in a Top-Down approach, hope
this can simply the code and framework and make it easy to understand.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]