devinjdangelo commented on code in PR #8923:
URL: https://github.com/apache/arrow-datafusion/pull/8923#discussion_r1461104489
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -885,24 +885,25 @@ async fn send_arrays_to_col_writers(
rb: &RecordBatch,
schema: Arc<Schema>,
) -> Result<()> {
- for (tx, array, field) in col_array_channels
- .iter()
- .zip(rb.columns())
- .zip(schema.fields())
- .map(|((a, b), c)| (a, b, c))
- {
+ // Each leaf column has its own channel, increment next_channel for each
leaf column sent.
+ let mut next_channel = 0;
+ for (array, field) in rb.columns().iter().zip(schema.fields()) {
for c in compute_leaves(field, array)? {
- tx.send(c).await.map_err(|_| {
- DataFusionError::Internal("Unable to send array to
writer!".into())
- })?;
+ col_array_channels[next_channel]
+ .send(c)
+ .await
+ .map_err(|_| {
+ DataFusionError::Internal("Unable to send array to
writer!".into())
+ })?;
+ next_channel += 1;
Review Comment:
Per top level field, yes. The "Field" referenced in the outer loop is only
top level, non-nested fields. The `compute_leaves` function does the recursive
iteration of all nested fields for each top level field.
There is actually an independent channel / parallel serializer for every
nested (not only top level) field. So, we must advance the channel we are
sending to on every iteration of `compute_leaves` not just on every iteration
of a new top level field.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]