xudong963 commented on a change in pull request #1776:
URL: https://github.com/apache/arrow-datafusion/pull/1776#discussion_r801727009



##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -36,33 +128,70 @@ impl Repartition {
     }
 }
 
+/// Recursively visits all `plan`s puts and then optionally adds a
+/// `RepartitionExec` at the output of `plan` to match
+/// `target_partitions`
+///
+/// if `can_reorder` is false, means that the output of this node
+/// can not be reordered as as something upstream is relying on that order
+///
+/// If 'would_benefit` is false, the upstream operator doesn't
+///  benefit from additional reordering
+///
 fn optimize_partitions(
     target_partitions: usize,
     plan: Arc<dyn ExecutionPlan>,
-    should_repartition: bool,
+    can_reorder: bool,
+    would_benefit: bool,

Review comment:
       ~~Not very understand the variable, If 'would_benefit` is false, the 
upstream operator doesn't benefit from additional reordering, but wouldn't 
produce wrong results? So it's ok to repartition to benefit from high 
parallelism? If so, I think the variable is needless.~~
   
   I noticed the annotation of the `benefits_from_input_partitioning` function 
👍, the variable makes sense to me. 

##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -142,29 +142,64 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     /// Specifies the output partitioning scheme of this plan
     fn output_partitioning(&self) -> Partitioning;
 
+    /// If the output of this operator is sorted, returns `Some(keys)`
+    /// with the description of how it was sorted.
+    ///
+    /// For example, Sort, (obviously) produces sorted output as does
+    /// SortPreservingMergeStream. Less obviously `Projection`
+    /// produces sorted output if its input was sorted as it does not
+    /// reorder the input rows
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
+
     /// Specifies the data distribution requirements of all the children for 
this operator
     fn required_child_distribution(&self) -> Distribution {
         Distribution::UnspecifiedDistribution
     }
 
-    /// Returns `true` if the direct children of this `ExecutionPlan` should 
be repartitioned
-    /// to introduce greater concurrency to the plan
+    /// Returns `true` if this operator relies on its inputs being
+    /// produced in a certain order (for example that they are sorted a 
particular way) for correctness.
     ///
-    /// The default implementation returns `true` unless 
`Self::required_child_distribution`
-    /// returns `Distribution::SinglePartition`
+    /// If `true` is returned, DataFusion will not apply certain
+    /// optimizations which might reorder the inputs (such as
+    /// repartitioning to increase concurrency).
     ///
-    /// Operators that do not benefit from additional partitioning may want to 
return `false`
-    fn should_repartition_children(&self) -> bool {
-        !matches!(
-            self.required_child_distribution(),
-            Distribution::SinglePartition
-        )
+    /// The default implementation returns `false`
+    fn relies_on_input_order(&self) -> bool {
+        false
+    }
+
+    /// Returns `false` if this operator's implementation may reorder
+    /// rows within or between partitions.
+    ///
+    /// For example, Projection, Filter, and Limit maintain the order
+    /// of inputs -- they may transform values (Projection) or not
+    /// produce the same number of rows that went in (Filter and
+    /// Limit), but the rows that are produced go in the same way.
+    ///
+    /// DataFusion uses this metadata to apply certain optimizations
+    /// such as automatically repartitioning correctly.
+    ///
+    /// The default implementation returns `false`
+    fn maintains_input_order(&self) -> bool {
+        false
+    }
+
+    /// Returns `true` if this operator would benefit from
+    /// partitioning its input (and thus from more parallelism). For
+    /// operators that do very little work the overhead of extra
+    /// parallelism may outweigh any benefits
+    ///
+    /// The default implementation returns `true`
+    fn benefits_from_input_partitioning(&self) -> bool {

Review comment:
       I noticed the return value of  `sort`, `limit`, `union` is **false**,  
so I want to know how to decide the result? In other words, how to decide the 
overhead of extra parallelism may outweigh any benefits? Is this an empirical 
estimate?

##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -75,7 +204,7 @@ fn optimize_partitions(
     // But also not very useful to inlude

Review comment:
       history typo
   ```suggestion
       // But also not very useful to include
   ```
   

##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -36,33 +128,70 @@ impl Repartition {
     }
 }
 
+/// Recursively visits all `plan`s puts and then optionally adds a
+/// `RepartitionExec` at the output of `plan` to match
+/// `target_partitions`
+///
+/// if `can_reorder` is false, means that the output of this node
+/// can not be reordered as as something upstream is relying on that order
+///
+/// If 'would_benefit` is false, the upstream operator doesn't
+///  benefit from additional reordering

Review comment:
       typo?
   ```suggestion
   ///  benefit from additional repartition
   ```
   I think we can add some annotation to make users see the annotation of the 
`benefits_from_input_partitioning` function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to