ozankabak commented on code in PR #5035:
URL: https://github.com/apache/arrow-datafusion/pull/5035#discussion_r1090231056
##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -816,6 +817,74 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_union_inputs_different_sorted() -> Result<()> {
+ let schema = create_test_schema()?;
+
+ let source1 = parquet_exec(&schema);
+ let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+ let sort = sort_exec(sort_exprs.clone(), source1);
+
+ let parquet_sort_exprs = vec![
+ sort_expr("nullable_col", &schema),
+ sort_expr("non_nullable_col", &schema),
+ ];
+ let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
+
+ let union = union_exec(vec![source2, sort]);
+ let physical_plan = sort_preserving_merge_exec(sort_exprs, union);
+
+ // one input to the union is already sorted, one is not.
+ let expected_input = vec![
+ "SortPreservingMergeExec: [nullable_col@0 ASC]",
+ " UnionExec",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC],
projection=[nullable_col, non_nullable_col]",
+ " SortExec: [nullable_col@0 ASC]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
+ ];
+ // should not add a sort at the output of the union, input plan should
not be changed
+ let expected_optimized = expected_input.clone();
+ assert_optimized!(expected_input, expected_optimized, physical_plan);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_union_inputs_different_sorted2() -> Result<()> {
+ let schema = create_test_schema()?;
+
+ let source1 = parquet_exec(&schema);
+ let sort_exprs = vec![
+ sort_expr("nullable_col", &schema),
+ sort_expr("non_nullable_col", &schema),
+ ];
+ let sort = sort_exec(sort_exprs.clone(), source1);
+
+ let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
+ let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
+
+ let union = union_exec(vec![source2, sort]);
+ let physical_plan = sort_preserving_merge_exec(sort_exprs, union);
+
+ // one input to the union is already sorted, one is not.
+ let expected_input = vec![
+ "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
+ " UnionExec",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
+ " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
+ ];
+ // should remove unnecessary sorting from below and move it to top
+ let expected_optimized = vec![
+ "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
+ " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
Review Comment:
I think trying a top-down approach to make the same optimizations and seeing
if it simplifies the code is a good idea.
With [the PR we are working
on](https://github.com/synnada-ai/arrow-datafusion/pull/43), we will fix cases
like your example and a make a few more other optimizations, and also add a
bunch of tests to verify each optimization so that future refactors will be
easier to do and don't lose any of these optimizations.
We should be done with it very soon, you can leverage the test/verification
collateral we will be adding to verify the refactor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]