alamb commented on code in PR #5035:
URL: https://github.com/apache/arrow-datafusion/pull/5035#discussion_r1091314823
##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -816,6 +817,74 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_union_inputs_different_sorted() -> Result<()> {
+ let schema = create_test_schema()?;
+
+ let source1 = parquet_exec(&schema);
+ let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+ let sort = sort_exec(sort_exprs.clone(), source1);
+
+ let parquet_sort_exprs = vec![
+ sort_expr("nullable_col", &schema),
+ sort_expr("non_nullable_col", &schema),
+ ];
+ let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
+
+ let union = union_exec(vec![source2, sort]);
+ let physical_plan = sort_preserving_merge_exec(sort_exprs, union);
+
+ // one input to the union is already sorted, one is not.
+ let expected_input = vec![
+ "SortPreservingMergeExec: [nullable_col@0 ASC]",
+ " UnionExec",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC],
projection=[nullable_col, non_nullable_col]",
+ " SortExec: [nullable_col@0 ASC]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
+ ];
+ // should not add a sort at the output of the union, input plan should
not be changed
+ let expected_optimized = expected_input.clone();
+ assert_optimized!(expected_input, expected_optimized, physical_plan);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_union_inputs_different_sorted2() -> Result<()> {
+ let schema = create_test_schema()?;
+
+ let source1 = parquet_exec(&schema);
+ let sort_exprs = vec![
+ sort_expr("nullable_col", &schema),
+ sort_expr("non_nullable_col", &schema),
+ ];
+ let sort = sort_exec(sort_exprs.clone(), source1);
+
+ let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
+ let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
+
+ let union = union_exec(vec![source2, sort]);
+ let physical_plan = sort_preserving_merge_exec(sort_exprs, union);
+
+ // one input to the union is already sorted, one is not.
+ let expected_input = vec![
+ "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
+ " UnionExec",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
+ " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
+ ];
+ // should remove unnecessary sorting from below and move it to top
+ let expected_optimized = vec![
+ "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
+ " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
Review Comment:
related PR: https://github.com/apache/arrow-datafusion/pull/5111
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]