viirya commented on code in PR #8731:
URL: https://github.com/apache/arrow-datafusion/pull/8731#discussion_r1440738248
##########
datafusion/core/src/physical_optimizer/enforce_distribution.rs:
##########
@@ -1198,37 +1191,25 @@ fn ensure_distribution(
)
.map(
|(mut child, requirement, required_input_ordering, would_benefit,
maintains)| {
- // Don't need to apply when the returned row count is not greater
than 1:
+ // Don't need to apply when the returned row count is not greater
than batch size
let num_rows = child.plan.statistics()?.num_rows;
let repartition_beneficial_stats = if
num_rows.is_exact().unwrap_or(false) {
num_rows
.get_value()
.map(|value| value > &batch_size)
- .unwrap_or(true)
+ .unwrap() // safe to unwrap since is_exact() is true
} else {
true
};
- if enable_round_robin
- // Operator benefits from partitioning (e.g. filter):
- && (would_benefit && repartition_beneficial_stats)
- // Unless partitioning doesn't increase the partition count,
it is not beneficial:
- && child.plan.output_partitioning().partition_count() <
target_partitions
- {
- // When `repartition_file_scans` is set, attempt to increase
- // parallelism at the source.
- if repartition_file_scans {
- if let Some(new_child) =
- child.plan.repartitioned(target_partitions, config)?
- {
- child.plan = new_child;
- }
+ // When `repartition_file_scans` is set, attempt to increase
+ // parallelism at the source.
+ if repartition_file_scans {
+ if let Some(new_child) =
+ child.plan.repartitioned(target_partitions, config)?
+ {
+ child.plan = new_child;
}
- // Increase parallelism by adding round-robin repartitioning
- // on top of the operator. Note that we only do this if the
- // partition count is not already equal to the desired
partition
- // count.
- child = add_roundrobin_on_top(child, target_partitions)?;
Review Comment:
> The idea as you said is to increase parallelism of the hash-repartition
operator. `target_partitions` in most cases / by design is currently bound to
the CPU.
Yea, however, I think in most cases hash repartitioning should already take
not-single partitioned input (unless its input is intentionally partitioned
into single partition). If the scans are well partitioned, later operators
follow the partitioning.
Also, I don't see it can obviously affect performance so far (based on the
benchmark result locally). Actually, I think mostly the addition round robin
doesn't help but adds a little cost (the round robin partitioning is not
zero-cost op).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]