This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a1e526137 Add new tests demonstrating pipeline breaking can be fixed 
with the help of SortPreservingRepartitionExec (#6953)
8a1e526137 is described below

commit 8a1e52613704461d0bcdeefea693ae67ff990520
Author: Mustafa Akur <[email protected]>
AuthorDate: Fri Jul 14 13:14:06 2023 +0300

    Add new tests demonstrating pipeline breaking can be fixed with the help of 
SortPreservingRepartitionExec (#6953)
---
 .../src/physical_optimizer/sort_enforcement.rs     | 65 ++++++++++++++++++++++
 1 file changed, 65 insertions(+)

diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs 
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 4587aadc68..61d423165e 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -2762,6 +2762,67 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_with_lost_ordering_bounded() -> Result<()> {
+        let schema = create_test_schema3()?;
+        let sort_exprs = vec![sort_expr("a", &schema)];
+        let source = csv_exec_sorted(&schema, sort_exprs, false);
+        let repartition_rr = repartition_exec(source);
+        let repartition_hash = Arc::new(RepartitionExec::try_new(
+            repartition_rr,
+            Partitioning::Hash(vec![col("c", &schema).unwrap()], 10),
+        )?) as _;
+        let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
+        let physical_plan = sort_exec(vec![sort_expr("a", &schema)], 
coalesce_partitions);
+
+        let expected_input = vec![
+            "SortExec: expr=[a@0 ASC]",
+            "  CoalescePartitionsExec",
+            "    RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[a@0 ASC], has_header=false",
+        ];
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [a@0 ASC]",
+            "  SortExec: expr=[a@0 ASC]",
+            "    RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[a@0 ASC], has_header=false",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_with_lost_ordering_unbounded() -> Result<()> {
+        let schema = create_test_schema3()?;
+        let sort_exprs = vec![sort_expr("a", &schema)];
+        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let repartition_rr = repartition_exec(source);
+        let repartition_hash = Arc::new(RepartitionExec::try_new(
+            repartition_rr,
+            Partitioning::Hash(vec![col("c", &schema).unwrap()], 10),
+        )?) as _;
+        let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
+        let physical_plan = sort_exec(vec![sort_expr("a", &schema)], 
coalesce_partitions);
+
+        let expected_input = vec![
+            "SortExec: expr=[a@0 ASC]",
+            "  CoalescePartitionsExec",
+            "    RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
+            "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
+        ];
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [a@0 ASC]",
+            "  SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "      CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        Ok(())
+    }
+
     /// make PhysicalSortExpr with default options
     fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
         sort_expr_options(name, schema, SortOptions::default())
@@ -2918,6 +2979,10 @@ mod tests {
         )
     }
 
+    fn coalesce_partitions_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
+        Arc::new(CoalescePartitionsExec::new(input))
+    }
+
     fn aggregate_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> 
{
         let schema = input.schema();
         Arc::new(

Reply via email to