Satyr09 opened a new issue, #22982:
URL: https://github.com/apache/datafusion/issues/22982

   ### Is your feature request related to a problem or challenge?
   
   #22649 added the max_row_group_bytes Parquet writer option, but it is only 
honoured by the single-threaded writer (`allow_single_file_parallelism = 
false`). Under the default (`allow_single_file_parallelism = 
   true`), the parallel writer in `datasource-parquet/src/sink.rs` decides 
row-group boundaries from max_row_group_size (row count) only and ignores the 
byte limit, so for most users the option silently
   has no effect.
   
   Why it happens: a row-group boundary must fall at the same row index in 
every column, so the cut is decided centrally in the dispatcher 
(spawn_parquet_parallel_serialization_task). But encoded sizes are only
   known inside the detached per-column tasks, on the far side of the column 
channels. The dispatcher knows how many rows it dispatched, never how many 
bytes they encoded to, so it cannot apply a byte limit.
   
   ### Describe the solution you'd like
   
   Teach the parallel dispatcher to honour max_row_group_bytes, flushing on 
whichever of the row or byte limit is reached first, mirroring ArrowWriter's 
best-effort, per-batch, predictive split (first batch of a row group written 
whole, subsequent batches sized from the observed average encoded row size). 
When no byte limit is set, the path stays as it is today.
   
   The only real design question is how the dispatcher learns the per-column 
encoded sizes across the channels. Two approaches feel viable to me (happy to 
hear alternatives):
   
   - Synchronized estimate / barrier - Column tasks report encoded size; before 
sizing each batch the dispatcher waits for every column to catch up and sums 
the exact sizes. Boundaries exact, deterministic, identical to the 
single-threaded writer, trivial to test. Cost: while a byte limit is set the 
dispatcher pauses at every batch, so the parallel writer effectively processes 
one batch at a time and loses most of its pipelining.
   
   - Passive projection / no barrier -  Each column task posts its latest 
progress (writes done and estimated bytes) to a watch channel that only keeps 
the most recent value. The dispatcher reads these without waiting, turns each 
column's own rows-and-bytes into an average bytes-per-row, and multiplies by 
the exact number of rows it has dispatched to estimate the current size. It 
blocks only once per row group, for the first report, so small limits stay 
predictable - after that it never waits, so the writer keeps streaming at full 
speed. The cost: boundaries are approximate (a group may end a batch early or 
late, with a small overshoot).
   
   
   Preferred option - Passive projection, best-effort approximate thresholds, 
no blocking.
   
   ### Describe alternatives you've considered
   
   _No response_
   
   ### Additional context
   
   Follow-up to #22649 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to