alamb commented on a change in pull request #1776:
URL: https://github.com/apache/arrow-datafusion/pull/1776#discussion_r801054089



##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -178,38 +335,46 @@ mod tests {
             .collect()
     }
 
+    /// Runs the repartition optimizer and asserts the plan against the 
expected
+    macro_rules! assert_optimized {
+        ($EXPECTED_LINES: expr, $PLAN: expr) => {
+            let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| 
*s).collect();
+
+            // run optimizer
+            let optimizer = Repartition {};
+            let optimized = optimizer
+                .optimize($PLAN, 
&ExecutionConfig::new().with_target_partitions(10))?;
+
+            // Now format correctly
+            let plan = displayable(optimized.as_ref()).indent().to_string();
+            let actual_lines = trim_plan_display(&plan);
+
+            assert_eq!(
+                &expected_lines, &actual_lines,
+                "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+                expected_lines, actual_lines
+            );
+        };
+    }
+
     #[test]
     fn added_repartition_to_single_partition() -> Result<()> {
-        let optimizer = Repartition {};
+        let plan = hash_aggregate(parquet_exec());

Review comment:
       I cleaned up the tests here to reduce the ceremony of invoking the 
optimizer. The plans are all the same

##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -36,33 +128,70 @@ impl Repartition {
     }
 }
 
+/// Recursively visits all `plan`s puts and then optionally adds a
+/// `RepartitionExec` at the output of `plan` to match
+/// `target_partitions`
+///
+/// if `can_reorder` is false, means that the output of this node
+/// can not be reordered as as something upstream is relying on that order
+///
+/// If 'would_benefit` is false, the upstream operator doesn't
+///  benefit from additional reordering
+///
 fn optimize_partitions(
     target_partitions: usize,
     plan: Arc<dyn ExecutionPlan>,
-    should_repartition: bool,
+    can_reorder: bool,

Review comment:
       Here is the change to the repartition logic to not repartition if it 
would produce incorrect answers

##########
File path: datafusion/src/physical_plan/analyze.rs
##########
@@ -82,6 +83,10 @@ impl ExecutionPlan for AnalyzeExec {
         Partitioning::UnknownPartitioning(1)
     }
 
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {

Review comment:
       Having to sprinkle `output_ordering` around was annoying -- but I think 
it may be worth it to try and avoid some nasty bugs. 

##########
File path: datafusion/src/physical_plan/limit.rs
##########
@@ -300,11 +335,6 @@ impl ExecutionPlan for LocalLimitExec {
             _ => Statistics::default(),
         }
     }
-
-    fn should_repartition_children(&self) -> bool {

Review comment:
       this is effectively renamed to `benefits_from_input_partitioning`

##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -219,20 +384,64 @@ mod tests {
             "ParquetExec: limit=None, partitions=[x]",
         ];
 
-        assert_eq!(&trim_plan_display(&plan), &expected);
+        assert_optimized!(expected, plan);
         Ok(())
     }
 
     #[test]
-    fn repartition_ignores_limit() -> Result<()> {
-        let optimizer = Repartition {};
+    fn repartition_unsorted_limit() -> Result<()> {
+        let plan = limit_exec(filter_exec(parquet_exec()));
+
+        let expected = &[
+            "GlobalLimitExec: limit=100",
+            "LocalLimitExec: limit=100",
+            "FilterExec: c1@0",
+            // nothing sorts the data, so the local limit doesn't require 
sorted data either
+            "RepartitionExec: partitioning=RoundRobinBatch(10)",
+            "ParquetExec: limit=None, partitions=[x]",
+        ];
+
+        assert_optimized!(expected, plan);
+        Ok(())
+    }
+
+    #[test]
+    fn repartition_sorted_limit() -> Result<()> {
+        let plan = limit_exec(sort_exec(parquet_exec()));
+
+        let expected = &[
+            "GlobalLimitExec: limit=100",
+            "LocalLimitExec: limit=100",
+            // data is sorted so can't repartition here
+            "SortExec: [c1@0 ASC]",
+            "ParquetExec: limit=None, partitions=[x]",
+        ];
+
+        assert_optimized!(expected, plan);
+        Ok(())
+    }
+
+    #[test]
+    fn repartition_sorted_limit_with_filter() -> Result<()> {
+        let plan = limit_exec(filter_exec(sort_exec(parquet_exec())));
 
-        let optimized = optimizer.optimize(
-            
hash_aggregate(limit_exec(filter_exec(limit_exec(parquet_exec())))),
-            &ExecutionConfig::new().with_target_partitions(10),
-        )?;
+        let expected = &[
+            "GlobalLimitExec: limit=100",
+            "LocalLimitExec: limit=100",
+            "FilterExec: c1@0",
+            // data is sorted so can't repartition here even though

Review comment:
       However, once you put a sort here then repartitioning can't happen 
without potentially getting wrong results

##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -219,20 +384,64 @@ mod tests {
             "ParquetExec: limit=None, partitions=[x]",
         ];
 
-        assert_eq!(&trim_plan_display(&plan), &expected);
+        assert_optimized!(expected, plan);
         Ok(())
     }
 
     #[test]
-    fn repartition_ignores_limit() -> Result<()> {
-        let optimizer = Repartition {};
+    fn repartition_unsorted_limit() -> Result<()> {

Review comment:
       new plans showing that data isn't repartitioned below limits if sorts 
are present

##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -147,24 +147,59 @@ pub trait ExecutionPlan: Debug + Send + Sync {
         Distribution::UnspecifiedDistribution
     }
 
-    /// Returns `true` if the direct children of this `ExecutionPlan` should 
be repartitioned
-    /// to introduce greater concurrency to the plan
+    /// Returns `true` if this operator relies on its inputs being

Review comment:
       Here is the new API for `ExecutionPlan` that signal how / when 
repartitioning occurs




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to