This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8a1e526137 Add new tests demonstrating pipeline breaking can be fixed
with the help of SortPreservingRepartitionExec (#6953)
8a1e526137 is described below
commit 8a1e52613704461d0bcdeefea693ae67ff990520
Author: Mustafa Akur <[email protected]>
AuthorDate: Fri Jul 14 13:14:06 2023 +0300
Add new tests demonstrating pipeline breaking can be fixed with the help of
SortPreservingRepartitionExec (#6953)
---
.../src/physical_optimizer/sort_enforcement.rs | 65 ++++++++++++++++++++++
1 file changed, 65 insertions(+)
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 4587aadc68..61d423165e 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -2762,6 +2762,67 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_with_lost_ordering_bounded() -> Result<()> {
+ let schema = create_test_schema3()?;
+ let sort_exprs = vec![sort_expr("a", &schema)];
+ let source = csv_exec_sorted(&schema, sort_exprs, false);
+ let repartition_rr = repartition_exec(source);
+ let repartition_hash = Arc::new(RepartitionExec::try_new(
+ repartition_rr,
+ Partitioning::Hash(vec![col("c", &schema).unwrap()], 10),
+ )?) as _;
+ let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
+ let physical_plan = sort_exec(vec![sort_expr("a", &schema)],
coalesce_partitions);
+
+ let expected_input = vec![
+ "SortExec: expr=[a@0 ASC]",
+ " CoalescePartitionsExec",
+ " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC], has_header=false",
+ ];
+ let expected_optimized = vec![
+ "SortPreservingMergeExec: [a@0 ASC]",
+ " SortExec: expr=[a@0 ASC]",
+ " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC], has_header=false",
+ ];
+ assert_optimized!(expected_input, expected_optimized, physical_plan);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_with_lost_ordering_unbounded() -> Result<()> {
+ let schema = create_test_schema3()?;
+ let sort_exprs = vec![sort_expr("a", &schema)];
+ let source = csv_exec_sorted(&schema, sort_exprs, true);
+ let repartition_rr = repartition_exec(source);
+ let repartition_hash = Arc::new(RepartitionExec::try_new(
+ repartition_rr,
+ Partitioning::Hash(vec![col("c", &schema).unwrap()], 10),
+ )?) as _;
+ let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
+ let physical_plan = sort_exec(vec![sort_expr("a", &schema)],
coalesce_partitions);
+
+ let expected_input = vec![
+ "SortExec: expr=[a@0 ASC]",
+ " CoalescePartitionsExec",
+ " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
+ ];
+ let expected_optimized = vec![
+ "SortPreservingMergeExec: [a@0 ASC]",
+ " SortPreservingRepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
+ ];
+ assert_optimized!(expected_input, expected_optimized, physical_plan);
+ Ok(())
+ }
+
/// make PhysicalSortExpr with default options
fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
sort_expr_options(name, schema, SortOptions::default())
@@ -2918,6 +2979,10 @@ mod tests {
)
}
+ fn coalesce_partitions_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn
ExecutionPlan> {
+ Arc::new(CoalescePartitionsExec::new(input))
+ }
+
fn aggregate_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan>
{
let schema = input.schema();
Arc::new(