viirya commented on code in PR #8731:
URL: https://github.com/apache/arrow-datafusion/pull/8731#discussion_r1440194056
##########
datafusion/core/src/physical_optimizer/enforce_distribution.rs:
##########
@@ -1198,37 +1191,25 @@ fn ensure_distribution(
)
.map(
|(mut child, requirement, required_input_ordering, would_benefit,
maintains)| {
- // Don't need to apply when the returned row count is not greater
than 1:
+ // Don't need to apply when the returned row count is not greater
than batch size
let num_rows = child.plan.statistics()?.num_rows;
let repartition_beneficial_stats = if
num_rows.is_exact().unwrap_or(false) {
num_rows
.get_value()
.map(|value| value > &batch_size)
- .unwrap_or(true)
+ .unwrap() // safe to unwrap since is_exact() is true
} else {
true
};
- if enable_round_robin
- // Operator benefits from partitioning (e.g. filter):
- && (would_benefit && repartition_beneficial_stats)
- // Unless partitioning doesn't increase the partition count,
it is not beneficial:
- && child.plan.output_partitioning().partition_count() <
target_partitions
- {
- // When `repartition_file_scans` is set, attempt to increase
- // parallelism at the source.
- if repartition_file_scans {
- if let Some(new_child) =
- child.plan.repartitioned(target_partitions, config)?
- {
- child.plan = new_child;
- }
+ // When `repartition_file_scans` is set, attempt to increase
+ // parallelism at the source.
+ if repartition_file_scans {
+ if let Some(new_child) =
+ child.plan.repartitioned(target_partitions, config)?
+ {
+ child.plan = new_child;
}
- // Increase parallelism by adding round-robin repartitioning
- // on top of the operator. Note that we only do this if the
- // partition count is not already equal to the desired
partition
- // count.
- child = add_roundrobin_on_top(child, target_partitions)?;
Review Comment:
This doesn't look correct to me. The produced query plan also looks confused.
If there are already distribution requirements like single partition or hash
partitioning (i.e., it will be repartitioning), why we add round-robin
repartitioning below it to result in two repartitioning?
From the comment, looks like it is to increase parallelism? But the
parallelism is bound to cpus, however here the partitioning number is
`target_partitions`. That's said it could go to round-robin into much higher
partitions (9000 for example as I saw such in one existing test case). But I
don't think we will have parallelism as same as `target_partitions`. Not to
mention that additional round-robin partitioning might have their cost.
Regarding parallelism, it sounds more correct to produce partitions at scans
(i.e., `repartition_file_scans`) and results in required parallelism on
following operators naturally instead of inserting arbitrary round-robin
repartitioning around distribution requirements.
I did benchmark for this change. Overall I don't see significant downgrade.
For example:
```
Benchmark clickbench_1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ cleanup ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0 │ 3.19ms │ 3.13ms │ no change │
│ QQuery 1 │ 22.84ms │ 24.64ms │ 1.08x slower │
│ QQuery 2 │ 71.05ms │ 79.33ms │ 1.12x slower │
│ QQuery 3 │ 60.94ms │ 67.83ms │ 1.11x slower │
│ QQuery 4 │ 453.96ms │ 470.30ms │ no change │
│ QQuery 5 │ 1089.41ms │ 877.61ms │ +1.24x faster │
│ QQuery 6 │ 24.80ms │ 27.15ms │ 1.09x slower │
│ QQuery 7 │ 24.79ms │ 26.62ms │ 1.07x slower │
│ QQuery 8 │ 1001.42ms │ 1000.47ms │ no change │
│ QQuery 9 │ 1625.50ms │ 1653.77ms │ no change │
│ QQuery 10 │ 201.92ms │ 237.07ms │ 1.17x slower │
│ QQuery 11 │ 226.95ms │ 251.00ms │ 1.11x slower │
│ QQuery 12 │ 1023.53ms │ 791.79ms │ +1.29x faster │
│ QQuery 13 │ 1672.11ms │ 1435.32ms │ +1.16x faster │
│ QQuery 14 │ 1117.68ms │ 881.34ms │ +1.27x faster │
│ QQuery 15 │ 554.72ms │ 574.87ms │ no change │
│ QQuery 16 │ 1900.79ms │ 1840.62ms │ no change │
│ QQuery 17 │ 1884.34ms │ 1791.93ms │ no change │
│ QQuery 18 │ 4313.52ms │ 3465.75ms │ +1.24x faster │
│ QQuery 19 │ 43.34ms │ 53.79ms │ 1.24x slower │
│ QQuery 20 │ 1976.39ms │ 1850.32ms │ +1.07x faster │
│ QQuery 21 │ 2127.49ms │ 2010.90ms │ +1.06x faster │
│ QQuery 22 │ 4457.89ms │ 4859.12ms │ 1.09x slower │
│ QQuery 23 │ 8203.33ms │ 8276.48ms │ no change │
│ QQuery 24 │ 511.62ms │ 426.74ms │ +1.20x faster │
│ QQuery 25 │ 486.66ms │ 366.92ms │ +1.33x faster │
│ QQuery 26 │ 588.12ms │ 473.45ms │ +1.24x faster │
│ QQuery 27 │ 1692.65ms │ 1595.15ms │ +1.06x faster │
│ QQuery 28 │ 12640.52ms │ 12382.83ms │ no change │
│ QQuery 29 │ 407.09ms │ 447.76ms │ 1.10x slower │
│ QQuery 30 │ 968.69ms │ 770.84ms │ +1.26x faster │
│ QQuery 31 │ 1066.54ms │ 876.63ms │ +1.22x faster │
│ QQuery 32 │ 10732.72ms │ 8554.12ms │ +1.25x faster │
│ QQuery 33 │ 7478.91ms │ 6086.03ms │ +1.23x faster │
│ QQuery 34 │ 6516.26ms │ 6733.28ms │ no change │
│ QQuery 35 │ 1236.72ms │ 1207.24ms │ no change │
│ QQuery 36 │ 198.37ms │ 195.67ms │ no change │
│ QQuery 37 │ 99.59ms │ 99.33ms │ no change │
│ QQuery 38 │ 111.45ms │ 111.15ms │ no change │
│ QQuery 39 │ 432.10ms │ 432.58ms │ no change │
│ QQuery 40 │ 45.72ms │ 43.84ms │ no change │
│ QQuery 41 │ 41.19ms │ 41.65ms │ no change │
│ QQuery 42 │ 48.21ms │ 48.99ms │ no change │
└──────────────┴────────────┴────────────┴───────────────┘
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]