tustvold commented on a change in pull request #1776:
URL: https://github.com/apache/arrow-datafusion/pull/1776#discussion_r801663379
##########
File path: datafusion/src/physical_plan/limit.rs
##########
@@ -232,6 +249,24 @@ impl ExecutionPlan for LocalLimitExec {
self.input.output_partitioning()
}
+ fn relies_on_input_order(&self) -> bool {
+ self.input.output_ordering().is_some()
Review comment:
This feels like an optimization that really belongs in the Repartition
optimizer, namely that if the children of a plan don't have a sort order, you
can freely repartition them even if the parent `relies_on_input_order`.
##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -142,29 +142,64 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// Specifies the output partitioning scheme of this plan
fn output_partitioning(&self) -> Partitioning;
+ /// If the output of this operator is sorted, returns `Some(keys)`
+ /// with the description of how it was sorted.
+ ///
+ /// For example, Sort, (obviously) produces sorted output as does
+ /// SortPreservingMergeStream. Less obviously `Projection`
+ /// produces sorted output if its input was sorted as it does not
+ /// reorder the input rows
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
+
/// Specifies the data distribution requirements of all the children for
this operator
fn required_child_distribution(&self) -> Distribution {
Distribution::UnspecifiedDistribution
}
- /// Returns `true` if the direct children of this `ExecutionPlan` should
be repartitioned
- /// to introduce greater concurrency to the plan
+ /// Returns `true` if this operator relies on its inputs being
+ /// produced in a certain order (for example that they are sorted a
particular way) for correctness.
///
- /// The default implementation returns `true` unless
`Self::required_child_distribution`
- /// returns `Distribution::SinglePartition`
+ /// If `true` is returned, DataFusion will not apply certain
+ /// optimizations which might reorder the inputs (such as
+ /// repartitioning to increase concurrency).
///
- /// Operators that do not benefit from additional partitioning may want to
return `false`
- fn should_repartition_children(&self) -> bool {
- !matches!(
- self.required_child_distribution(),
- Distribution::SinglePartition
- )
+ /// The default implementation returns `false`
+ fn relies_on_input_order(&self) -> bool {
+ false
Review comment:
I think a default of `true` would be safer...
##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -142,29 +142,64 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// Specifies the output partitioning scheme of this plan
fn output_partitioning(&self) -> Partitioning;
+ /// If the output of this operator is sorted, returns `Some(keys)`
+ /// with the description of how it was sorted.
+ ///
+ /// For example, Sort, (obviously) produces sorted output as does
+ /// SortPreservingMergeStream. Less obviously `Projection`
+ /// produces sorted output if its input was sorted as it does not
+ /// reorder the input rows
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
+
/// Specifies the data distribution requirements of all the children for
this operator
fn required_child_distribution(&self) -> Distribution {
Distribution::UnspecifiedDistribution
}
- /// Returns `true` if the direct children of this `ExecutionPlan` should
be repartitioned
- /// to introduce greater concurrency to the plan
+ /// Returns `true` if this operator relies on its inputs being
+ /// produced in a certain order (for example that they are sorted a
particular way) for correctness.
///
- /// The default implementation returns `true` unless
`Self::required_child_distribution`
- /// returns `Distribution::SinglePartition`
+ /// If `true` is returned, DataFusion will not apply certain
+ /// optimizations which might reorder the inputs (such as
+ /// repartitioning to increase concurrency).
///
- /// Operators that do not benefit from additional partitioning may want to
return `false`
- fn should_repartition_children(&self) -> bool {
- !matches!(
- self.required_child_distribution(),
- Distribution::SinglePartition
- )
+ /// The default implementation returns `false`
+ fn relies_on_input_order(&self) -> bool {
+ false
+ }
+
+ /// Returns `false` if this operator's implementation may reorder
+ /// rows within or between partitions.
+ ///
+ /// For example, Projection, Filter, and Limit maintain the order
+ /// of inputs -- they may transform values (Projection) or not
+ /// produce the same number of rows that went in (Filter and
+ /// Limit), but the rows that are produced go in the same way.
+ ///
+ /// DataFusion uses this metadata to apply certain optimizations
+ /// such as automatically repartitioning correctly.
+ ///
+ /// The default implementation returns `false`
+ fn maintains_input_order(&self) -> bool {
+ false
+ }
+
+ /// Returns `true` if this operator would benefit from
+ /// partitioning its input (and thus from more parallelism). For
+ /// operators that do very little work the overhead of extra
+ /// parallelism may outweigh any benefits
+ ///
+ /// The default implementation returns `true`
+ fn benefits_from_input_partitioning(&self) -> bool {
+ // give me MOAR CPUs
+ true
Review comment:
Why did you remove the required_child_distribution?
##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -219,20 +384,64 @@ mod tests {
"ParquetExec: limit=None, partitions=[x]",
];
- assert_eq!(&trim_plan_display(&plan), &expected);
+ assert_optimized!(expected, plan);
Ok(())
}
#[test]
- fn repartition_ignores_limit() -> Result<()> {
- let optimizer = Repartition {};
+ fn repartition_unsorted_limit() -> Result<()> {
Review comment:
These tests are :ok_hand:
##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -36,33 +128,70 @@ impl Repartition {
}
}
+/// Recursively visits all `plan`s puts and then optionally adds a
Review comment:
For my own understanding I'm going to write out what this does.
It does a depth first scan of the tree, and repartitions any plan that:
* Has less than the desired number of partitions
* Has a direct parent that `benefits_from_input_partitioning`
* Does not have a parent that `relies_on_input_order` unless there is an
intervening node that does not `maintain_input_order`
##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -142,29 +142,64 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// Specifies the output partitioning scheme of this plan
fn output_partitioning(&self) -> Partitioning;
+ /// If the output of this operator is sorted, returns `Some(keys)`
+ /// with the description of how it was sorted.
+ ///
+ /// For example, Sort, (obviously) produces sorted output as does
+ /// SortPreservingMergeStream. Less obviously `Projection`
+ /// produces sorted output if its input was sorted as it does not
+ /// reorder the input rows
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
+
/// Specifies the data distribution requirements of all the children for
this operator
fn required_child_distribution(&self) -> Distribution {
Distribution::UnspecifiedDistribution
}
- /// Returns `true` if the direct children of this `ExecutionPlan` should
be repartitioned
- /// to introduce greater concurrency to the plan
+ /// Returns `true` if this operator relies on its inputs being
+ /// produced in a certain order (for example that they are sorted a
particular way) for correctness.
///
- /// The default implementation returns `true` unless
`Self::required_child_distribution`
- /// returns `Distribution::SinglePartition`
+ /// If `true` is returned, DataFusion will not apply certain
+ /// optimizations which might reorder the inputs (such as
+ /// repartitioning to increase concurrency).
///
- /// Operators that do not benefit from additional partitioning may want to
return `false`
- fn should_repartition_children(&self) -> bool {
- !matches!(
- self.required_child_distribution(),
- Distribution::SinglePartition
- )
+ /// The default implementation returns `false`
+ fn relies_on_input_order(&self) -> bool {
+ false
+ }
+
+ /// Returns `false` if this operator's implementation may reorder
+ /// rows within or between partitions.
+ ///
+ /// For example, Projection, Filter, and Limit maintain the order
+ /// of inputs -- they may transform values (Projection) or not
+ /// produce the same number of rows that went in (Filter and
+ /// Limit), but the rows that are produced go in the same way.
+ ///
+ /// DataFusion uses this metadata to apply certain optimizations
+ /// such as automatically repartitioning correctly.
+ ///
+ /// The default implementation returns `false`
+ fn maintains_input_order(&self) -> bool {
Review comment:
I spent a long time trying to understand why there is both this and
`output_ordering` and it is because this indicates if the operator preserves
the order, not if that order is actually sorted :sweat_smile:
##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -36,33 +128,70 @@ impl Repartition {
}
}
+/// Recursively visits all `plan`s puts and then optionally adds a
+/// `RepartitionExec` at the output of `plan` to match
+/// `target_partitions`
+///
+/// if `can_reorder` is false, means that the output of this node
+/// can not be reordered as as something upstream is relying on that order
+///
+/// If 'would_benefit` is false, the upstream operator doesn't
+/// benefit from additional reordering
+///
fn optimize_partitions(
target_partitions: usize,
plan: Arc<dyn ExecutionPlan>,
- should_repartition: bool,
+ can_reorder: bool,
+ would_benefit: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
- // Recurse into children bottom-up (added nodes should be as deep as
possible)
+ // Recurse into children bottom-up (attempt to repartition as
+ // early as possible)
let new_plan = if plan.children().is_empty() {
// leaf node - don't replace children
plan.clone()
} else {
- let should_repartition_children = plan.should_repartition_children();
+ let can_reorder_children =
+ match (plan.relies_on_input_order(), plan.maintains_input_order())
{
+ (true, _) => {
+ // `plan` itself relies on the order of its
+ // children, so don't reorder them!
+ false
+ }
+ (false, false) => {
+ // `plan` may reorder the input itself, so no need
+ // to preserve the order of any children
+ true
Review comment:
I think this has lost the `requires_single_partition` case, that being
said I'm not sure why this matters? A `CoalesceBatches` will just be inserted?
Perhaps `would_benefit` should be set to false if this requires a single
partition, as this won't propagate beyond the direct children? :thinking:
##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -36,33 +128,70 @@ impl Repartition {
}
}
+/// Recursively visits all `plan`s puts and then optionally adds a
+/// `RepartitionExec` at the output of `plan` to match
+/// `target_partitions`
+///
+/// if `can_reorder` is false, means that the output of this node
+/// can not be reordered as as something upstream is relying on that order
+///
+/// If 'would_benefit` is false, the upstream operator doesn't
+/// benefit from additional reordering
Review comment:
```suggestion
/// benefit from additional partitioning
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]