devinjdangelo commented on issue #8853:
URL:
https://github.com/apache/arrow-datafusion/issues/8853#issuecomment-1890529600
I tried to make a minimal reproducer with just arrow-rs, but it appears to
work fine.
It must be then that there is an issue with the tokio implementation in
DataFusion.
```rust
use std::sync::Arc;
use arrow_array::*;
use arrow_schema::*;
use parquet::arrow::arrow_to_parquet_schema;
use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves,
get_column_writers};
use parquet::file::properties::WriterProperties;
use parquet::file::writer::SerializedFileWriter;
fn main(){
let schema = Arc::new(Schema::new(vec![
Field::new("struct", DataType::Struct(vec![
Field::new("b", DataType::Boolean, false),
Field::new("c", DataType::Int32, false),].into()), false
)
]));
// Compute the parquet schema
let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap();
let props = Arc::new(WriterProperties::default());
// Create writers for each of the leaf columns
let col_writers = get_column_writers(&parquet_schema, &props,
&schema).unwrap();
// Spawn a worker thread for each column
// This is for demonstration purposes, a thread-pool e.g. rayon or
tokio, would be better
let mut workers: Vec<_> = col_writers
.into_iter()
.map(|mut col_writer| {
let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
let handle = std::thread::spawn(move || {
for col in recv {
col_writer.write(&col)?;
}
col_writer.close()
});
(handle, send)
})
.collect();
// Create parquet writer
let root_schema = parquet_schema.root_schema_ptr();
let mut out = Vec::with_capacity(1024); // This could be a File
let mut writer = SerializedFileWriter::new(&mut out, root_schema,
props.clone()).unwrap();
// Start row group
let mut row_group = writer.next_row_group().unwrap();
let boolean = Arc::new(BooleanArray::from(vec![false, false, true,
true]));
let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31]));
// Columns to encode
let to_write = vec![Arc::new(
StructArray::from(vec![
(
Arc::new(Field::new("b", DataType::Boolean, false)),
boolean.clone() as ArrayRef,
),
(
Arc::new(Field::new("c", DataType::Int32, false)),
int.clone() as ArrayRef,
),
])) as _,
];
// Spawn work to encode columns
let mut worker_iter = workers.iter_mut();
for (arr, field) in to_write.iter().zip(&schema.fields) {
for leaves in compute_leaves(field, arr).unwrap() {
worker_iter.next().unwrap().1.send(leaves).unwrap();
}
}
// Finish up parallel column encoding
for (handle, send) in workers {
drop(send); // Drop send side to signal termination
let chunk = handle.join().unwrap().unwrap();
chunk.append_to_row_group(&mut row_group).unwrap();
}
row_group.close().unwrap();
let metadata = writer.close().unwrap();
assert_eq!(metadata.num_rows, 4);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]