alamb commented on a change in pull request #1776:
URL: https://github.com/apache/arrow-datafusion/pull/1776#discussion_r801887204



##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -142,29 +142,64 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     /// Specifies the output partitioning scheme of this plan
     fn output_partitioning(&self) -> Partitioning;
 
+    /// If the output of this operator is sorted, returns `Some(keys)`
+    /// with the description of how it was sorted.
+    ///
+    /// For example, Sort, (obviously) produces sorted output as does
+    /// SortPreservingMergeStream. Less obviously `Projection`
+    /// produces sorted output if its input was sorted as it does not
+    /// reorder the input rows
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
+
     /// Specifies the data distribution requirements of all the children for 
this operator
     fn required_child_distribution(&self) -> Distribution {
         Distribution::UnspecifiedDistribution
     }
 
-    /// Returns `true` if the direct children of this `ExecutionPlan` should 
be repartitioned
-    /// to introduce greater concurrency to the plan
+    /// Returns `true` if this operator relies on its inputs being
+    /// produced in a certain order (for example that they are sorted a 
particular way) for correctness.
     ///
-    /// The default implementation returns `true` unless 
`Self::required_child_distribution`
-    /// returns `Distribution::SinglePartition`
+    /// If `true` is returned, DataFusion will not apply certain
+    /// optimizations which might reorder the inputs (such as
+    /// repartitioning to increase concurrency).
     ///
-    /// Operators that do not benefit from additional partitioning may want to 
return `false`
-    fn should_repartition_children(&self) -> bool {
-        !matches!(
-            self.required_child_distribution(),
-            Distribution::SinglePartition
-        )
+    /// The default implementation returns `false`
+    fn relies_on_input_order(&self) -> bool {
+        false
+    }
+
+    /// Returns `false` if this operator's implementation may reorder
+    /// rows within or between partitions.
+    ///
+    /// For example, Projection, Filter, and Limit maintain the order
+    /// of inputs -- they may transform values (Projection) or not
+    /// produce the same number of rows that went in (Filter and
+    /// Limit), but the rows that are produced go in the same way.
+    ///
+    /// DataFusion uses this metadata to apply certain optimizations
+    /// such as automatically repartitioning correctly.
+    ///
+    /// The default implementation returns `false`
+    fn maintains_input_order(&self) -> bool {
+        false
+    }
+
+    /// Returns `true` if this operator would benefit from
+    /// partitioning its input (and thus from more parallelism). For
+    /// operators that do very little work the overhead of extra
+    /// parallelism may outweigh any benefits
+    ///
+    /// The default implementation returns `true`
+    fn benefits_from_input_partitioning(&self) -> bool {
+        // give me MOAR CPUs
+        true

Review comment:
       I was trying to separate the notions of correctness from possible 
optimizations; However when I type out the rationale it doesn't really hold up; 
I will put it back. 

##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -36,33 +128,70 @@ impl Repartition {
     }
 }
 
+/// Recursively visits all `plan`s puts and then optionally adds a
+/// `RepartitionExec` at the output of `plan` to match
+/// `target_partitions`
+///
+/// if `can_reorder` is false, means that the output of this node
+/// can not be reordered as as something upstream is relying on that order
+///
+/// If 'would_benefit` is false, the upstream operator doesn't
+///  benefit from additional reordering
+///
 fn optimize_partitions(
     target_partitions: usize,
     plan: Arc<dyn ExecutionPlan>,
-    should_repartition: bool,
+    can_reorder: bool,
+    would_benefit: bool,
 ) -> Result<Arc<dyn ExecutionPlan>> {
-    // Recurse into children bottom-up (added nodes should be as deep as 
possible)
+    // Recurse into children bottom-up (attempt to repartition as
+    // early as possible)
 
     let new_plan = if plan.children().is_empty() {
         // leaf node - don't replace children
         plan.clone()
     } else {
-        let should_repartition_children = plan.should_repartition_children();
+        let can_reorder_children =
+            match (plan.relies_on_input_order(), plan.maintains_input_order()) 
{
+                (true, _) => {
+                    // `plan` itself relies on the order of its
+                    // children, so don't reorder them!
+                    false
+                }
+                (false, false) => {
+                    // `plan` may reorder the input itself, so no need
+                    // to preserve the order of any children
+                    true

Review comment:
       Interestingly when I add the `requres_single_partition` case here I get 
failures with the tests on
   
   ```
   [
       "SortPreservingMergeExec: [c1@0 ASC]",
       "SortExec: [c1@0 ASC]",
       "ProjectionExec: expr=[c1@0 as c1]",
       "RepartitionExec: partitioning=RoundRobinBatch(10)",
       "ParquetExec: limit=None, partitions=[x]",
   ]
   actual:
   
   [
       "SortPreservingMergeExec: [c1@0 ASC]",
       "SortExec: [c1@0 ASC]",
       "ProjectionExec: expr=[c1@0 as c1]",
       "ParquetExec: limit=None, partitions=[x]",
   ]
   ```
   
   Aka repartitioning doesn't two levels down.
   
   So rather than intermix the "should we bother repartitioning" with the 
"would it produce wrong answers" I simply removed the check for the required 
input partitioning and it is now included in the default  "benefits from 
repartitioning check" 

##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -142,29 +142,64 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     /// Specifies the output partitioning scheme of this plan
     fn output_partitioning(&self) -> Partitioning;
 
+    /// If the output of this operator is sorted, returns `Some(keys)`
+    /// with the description of how it was sorted.
+    ///
+    /// For example, Sort, (obviously) produces sorted output as does
+    /// SortPreservingMergeStream. Less obviously `Projection`
+    /// produces sorted output if its input was sorted as it does not
+    /// reorder the input rows
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
+
     /// Specifies the data distribution requirements of all the children for 
this operator
     fn required_child_distribution(&self) -> Distribution {
         Distribution::UnspecifiedDistribution
     }
 
-    /// Returns `true` if the direct children of this `ExecutionPlan` should 
be repartitioned
-    /// to introduce greater concurrency to the plan
+    /// Returns `true` if this operator relies on its inputs being
+    /// produced in a certain order (for example that they are sorted a 
particular way) for correctness.
     ///
-    /// The default implementation returns `true` unless 
`Self::required_child_distribution`
-    /// returns `Distribution::SinglePartition`
+    /// If `true` is returned, DataFusion will not apply certain
+    /// optimizations which might reorder the inputs (such as
+    /// repartitioning to increase concurrency).
     ///
-    /// Operators that do not benefit from additional partitioning may want to 
return `false`
-    fn should_repartition_children(&self) -> bool {
-        !matches!(
-            self.required_child_distribution(),
-            Distribution::SinglePartition
-        )
+    /// The default implementation returns `false`
+    fn relies_on_input_order(&self) -> bool {
+        false
+    }
+
+    /// Returns `false` if this operator's implementation may reorder
+    /// rows within or between partitions.
+    ///
+    /// For example, Projection, Filter, and Limit maintain the order
+    /// of inputs -- they may transform values (Projection) or not
+    /// produce the same number of rows that went in (Filter and
+    /// Limit), but the rows that are produced go in the same way.
+    ///
+    /// DataFusion uses this metadata to apply certain optimizations
+    /// such as automatically repartitioning correctly.
+    ///
+    /// The default implementation returns `false`
+    fn maintains_input_order(&self) -> bool {

Review comment:
       Yes. I will make this clearer in the comments




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to