alamb commented on code in PR #5228:
URL: https://github.com/apache/arrow-datafusion/pull/5228#discussion_r1103625562
##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -1776,6 +1745,47 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_coalesce_propagate() -> Result<()> {
+ let schema = create_test_schema()?;
+ let source = memory_exec(&schema);
+ let repartition = repartition_exec(source);
+ let coalesce_partitions =
Arc::new(CoalescePartitionsExec::new(repartition));
+ let repartition = repartition_exec(coalesce_partitions);
+ let sort_exprs = vec![sort_expr("nullable_col", &schema)];
+ // Add local sort
+ let sort = Arc::new(SortExec::new_with_partitioning(
+ sort_exprs.clone(),
+ repartition,
+ true,
+ None,
+ )) as _;
+ let spm = sort_preserving_merge_exec(sort_exprs.clone(), sort);
+ let sort = sort_exec(sort_exprs, spm);
+
+ let physical_plan = sort.clone();
+ // Sort Parallelize rule should end Coalesce + Sort linkage when Sort
is Global Sort
+ // Also input plan is not valid as it is. We need to add SortExec
before SortPreservingMergeExec.
+ let expected_input = vec![
+ "SortExec: [nullable_col@0 ASC]",
+ " SortPreservingMergeExec: [nullable_col@0 ASC]",
+ " SortExec: [nullable_col@0 ASC]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " CoalescePartitionsExec",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=0",
+ " MemoryExec: partitions=0, partition_sizes=[]",
+ ];
+ let expected_optimized = vec![
+ "SortPreservingMergeExec: [nullable_col@0 ASC]",
+ " SortExec: [nullable_col@0 ASC]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=0",
+ " MemoryExec: partitions=0, partition_sizes=[]",
Review Comment:
This plan definitely looks better than the input.
##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -1718,8 +1687,8 @@ mod tests {
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
- " FilterExec: NOT non_nullable_col@1",
- " SortExec: [nullable_col@0 ASC]",
+ " SortExec: [nullable_col@0 ASC]",
+ " FilterExec: NOT non_nullable_col@1",
Review Comment:
This plan change looks better to me as well (do filtering before sort)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]