alamb commented on code in PR #4885:
URL: https://github.com/apache/arrow-datafusion/pull/4885#discussion_r1068148858
##########
datafusion/core/tests/sql/explain_analyze.rs:
##########
@@ -90,7 +90,7 @@ async fn explain_analyze_baseline_metrics() {
assert_metrics!(
&formatted,
"CoalescePartitionsExec",
- "metrics=[output_rows=3, elapsed_compute="
+ "metrics=[output_rows=5, elapsed_compute="
Review Comment:
this is due to the plan changing in the same way as explained in
`datafusion/core/tests/sql/window.rs`
##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2290,30 +2290,32 @@ async fn test_remove_unnecessary_sort_in_sub_query() ->
Result<()> {
let dataframe = ctx.sql(sql).await.expect(&msg);
let physical_plan = dataframe.create_physical_plan().await?;
let formatted = displayable(physical_plan.as_ref()).indent().to_string();
+
// Unnecessary Sort in the sub query is removed
- let expected = {
- vec![
- "ProjectionExec: expr=[COUNT(UInt8(1))@0 as global_count]",
- " AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]",
- " CoalescePartitionsExec",
- " AggregateExec: mode=Partial, gby=[],
aggr=[COUNT(UInt8(1))]",
- " RepartitionExec: partitioning=RoundRobinBatch(8)",
Review Comment:
Using https://github.com/apache/arrow-datafusion/pull/4885/files?w=1 is
easiest to see what changed -- the final group by with no group exprs is no
longer done in parallel which I think will actually improve performance
##########
datafusion/core/src/physical_optimizer/repartition.rs:
##########
@@ -583,9 +624,94 @@ mod tests {
}
#[test]
- fn repartition_does_not_repartition_transitively() -> Result<()> {
+ fn repartition_ignores_sort_preserving_merge() -> Result<()> {
+ // sort preserving merge already sorted input,
+ let plan = sort_preserving_merge_exec(parquet_exec_sorted());
+
+ // should not repartition / sort (as the data was already sorted)
+ let expected = &[
+ "SortPreservingMergeExec: [c1@0 ASC]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[c1@0 ASC], projection=[c1]",
+ ];
+
+ assert_optimized!(expected, plan);
+ Ok(())
+ }
+
+ #[test]
+ fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> {
+ // 2 sorted parquet files unioned (partitions are concatenated, sort
is preserved)
+ let input = union_exec(vec![parquet_exec_sorted(); 2]);
+ let plan = sort_preserving_merge_exec(input);
+
+ // should not repartition / sort (as the data was already sorted)
+ let expected = &[
+ "SortPreservingMergeExec: [c1@0 ASC]",
+ "UnionExec",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[c1@0 ASC], projection=[c1]",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[c1@0 ASC], projection=[c1]",
+ ];
+
+ assert_optimized!(expected, plan);
+ Ok(())
+ }
+
+ #[test]
+ fn repartition_does_not_destroy_sort() -> Result<()> {
+ // SortRequired
+ // Parquet(sorted)
+
+ let plan = sort_required_exec(parquet_exec_sorted());
+
+ // should not repartition as doing so destroys the necessary sort order
+ let expected = &[
+ "SortRequiredExec",
+ "ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[c1@0 ASC], projection=[c1]",
+ ];
+
+ assert_optimized!(expected, plan);
+ Ok(())
+ }
+
+ #[test]
+ fn repartition_does_not_destroy_sort_more_complex() -> Result<()> {
+ // model a more complicated scenario where one child of a union can be
repartitioned for performance
+ // but the other can not be
+ //
Review Comment:
This plan is close to what we have in IOx
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]