mustafasrepo commented on code in PR #5171:
URL: https://github.com/apache/arrow-datafusion/pull/5171#discussion_r1098275745
##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -775,6 +1187,133 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_remove_unnecessary_sort2() -> Result<()> {
+ let schema = create_test_schema()?;
+ let source = memory_exec(&schema);
+ let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
+ let sort = sort_exec(sort_exprs.clone(), source);
+ let spm = sort_preserving_merge_exec(sort_exprs, sort);
+
+ let sort_exprs = vec![
+ sort_expr("nullable_col", &schema),
+ sort_expr("non_nullable_col", &schema),
+ ];
+ let sort2 = sort_exec(sort_exprs.clone(), spm);
+ let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
+
+ let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+ let sort3 = sort_exec(sort_exprs, spm2);
+ let physical_plan = repartition_exec(repartition_exec(sort3));
+
+ let expected_input = vec![
+ "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " SortExec: [nullable_col@0 ASC]",
+ " SortPreservingMergeExec: [nullable_col@0
ASC,non_nullable_col@1 ASC]",
+ " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+ " SortExec: [non_nullable_col@1 ASC]",
+ " MemoryExec: partitions=0, partition_sizes=[]",
+ ];
+
+ let expected_optimized = vec![
+ "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=0",
+ " MemoryExec: partitions=0, partition_sizes=[]",
+ ];
+ assert_optimized!(expected_input, expected_optimized, physical_plan);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_remove_unnecessary_sort3() -> Result<()> {
+ let schema = create_test_schema()?;
+ let source = memory_exec(&schema);
+ let sort_exprs = vec![sort_expr("non_nullable_col", &schema)];
+ let sort = sort_exec(sort_exprs.clone(), source);
+ let spm = sort_preserving_merge_exec(sort_exprs, sort);
+
+ let sort_exprs = vec![
+ sort_expr("nullable_col", &schema),
+ sort_expr("non_nullable_col", &schema),
+ ];
+ let repartition_exec = repartition_exec(spm);
+ let sort2 = sort_exec(sort_exprs.clone(), repartition_exec);
+ let spm2 = sort_preserving_merge_exec(sort_exprs, sort2);
+
+ let physical_plan = aggregate_exec(spm2);
+
+ // When removing a `SortPreservingMergeExec`, make sure that
partitioning
+ // requirements are not violated. In some cases, we may need to replace
+ // it with a `CoalescePartitionsExec` instead of directly removing it.
+ let expected_input = vec![
+ "AggregateExec: mode=Final, gby=[], aggr=[]",
+ " SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
+ " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " SortPreservingMergeExec: [non_nullable_col@1 ASC]",
+ " SortExec: [non_nullable_col@1 ASC]",
+ " MemoryExec: partitions=0, partition_sizes=[]",
+ ];
+
+ let expected_optimized = vec![
+ "AggregateExec: mode=Final, gby=[], aggr=[]",
+ " CoalescePartitionsExec",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=0",
+ " MemoryExec: partitions=0, partition_sizes=[]",
+ ];
+ assert_optimized!(expected_input, expected_optimized, physical_plan);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_do_not_remove_sort_with_limit() -> Result<()> {
+ let schema = create_test_schema()?;
+
+ let source1 = parquet_exec(&schema);
+ let sort_exprs = vec![
+ sort_expr("nullable_col", &schema),
+ sort_expr("non_nullable_col", &schema),
+ ];
+ let sort = sort_exec(sort_exprs.clone(), source1);
+ let limit = local_limit_exec(sort);
+ let limit = global_limit_exec(limit);
+
+ let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
+ let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
+
+ let union = union_exec(vec![source2, limit]);
+ let repartition = repartition_exec(union);
+ let physical_plan = sort_preserving_merge_exec(sort_exprs,
repartition);
+
+ let expected_input = vec![
+ "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
+ " UnionExec",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
+ " GlobalLimitExec: skip=0, fetch=100",
+ " LocalLimitExec: fetch=100",
+ " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
projection=[nullable_col, non_nullable_col]",
+ ];
+
+ // We should keep the bottom `SortExec`.
+ let expected_optimized = vec![
+ "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
+ " SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
+ " UnionExec",
+ " ParquetExec: limit=None, partitions={1 group: [[x]]},
output_ordering=[nullable_col@0 ASC], projection=[nullable_col,
non_nullable_col]",
+ " GlobalLimitExec: skip=0, fetch=100",
+ " LocalLimitExec: fetch=100",
+ " SortExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
+ " ParquetExec: limit=None, partitions={1 group:
[[x]]}, projection=[nullable_col, non_nullable_col]",
+ ];
+ assert_optimized!(expected_input, expected_optimized, physical_plan);
+ Ok(())
+ }
+
#[tokio::test]
Review Comment:
We can separate enforcement functionality in a pass then optimize away
unnecessary Sorts. However, optimization code would be exactly same with this
one (Since during removal we may invalidate ordering requirements for above
layers and we need to add necessary Sort). In that case, first enforcement pass
would be redundant. Hence, separating them doesn't help. However, I will think
through it in detail. Maybe we can find a way to accomplish this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]