andygrove commented on issue #1907:
URL: 
https://github.com/apache/datafusion-ballista/issues/1907#issuecomment-4819329415

   **Update — root cause is data amplification, not slow decode.**
   
   Measuring the partial-aggregate input rows (`reduction_factor` metric, which 
is reliable, unlike the merged scan `output_rows`):
   
   | q1 partitions | slots | input rows to partial aggregate |
   |---:|---:|---:|
   | 8 | 8 (all wave 1) | **59.1 M** (≈ table, correct) |
   | 9 | 8 (8 wave-1 + 1 wave-2) | **118.3 M** |
   
   Going from 8→9 partitions added **one** task that runs in the second wave, 
and total processed rows jumped by ~59 M — i.e. that **single second-wave task 
scanned the entire table (~59 M rows) instead of its ~1/9 file-range 
partition.**
   
   So: **tasks that run after the first wave scan the whole dataset instead of 
their assigned file range.** Key properties:
   - It is **wave-/slot-dependent, not partition-index-dependent**: with 
`-c16`, all 16 partitions of `q1 --partitions 16` run in one wave and the query 
is fast (0.3 s, ~59 M rows total); with `-c8` the same plan runs in two waves 
and the second-wave tasks each re-scan the whole table (4.2 s). Same serialized 
plan, same file groups — only the number of concurrent slots (waves) differs.
   - It **resets per query** (the first wave is always correct/fast), so it is 
tied to per-stage execution state, not process-global accumulation.
   - CPU profile of a single slow task is dominated by aggregate accumulate + 
group-by-on-string-columns + snappy decode — consistent with simply processing 
~8× the rows.
   
   This explains the earlier symptoms (≈N× scan inefficiency, disk-full on join 
queries from shuffling N× data). It looks like later-wave tasks fail to 
restrict the parquet scan to their `file_group`'s byte range and read all row 
groups — possibly related to the DF54 parquet footer/metadata handling (we hit 
metadata-cache changes upgrading Comet). Repro: `tpch benchmark ballista 
--query 1 --partitions 9 --iterations 1 -c 
datafusion.optimizer.prefer_hash_join=false` on an 8-slot executor.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to