devinjdangelo commented on issue #8853:
URL: 
https://github.com/apache/arrow-datafusion/issues/8853#issuecomment-1890529600

   I tried to make a minimal reproducer with just arrow-rs, but it appears to 
work fine.
   
   It must be then that there is an issue with the tokio implementation in 
DataFusion. 
   
   ```rust
   
   use std::sync::Arc;
   use arrow_array::*;
   use arrow_schema::*;
   use parquet::arrow::arrow_to_parquet_schema;
   use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, 
get_column_writers};
   use parquet::file::properties::WriterProperties;
   use parquet::file::writer::SerializedFileWriter;
   
   fn main(){
       let schema = Arc::new(Schema::new(vec![
           Field::new("struct", DataType::Struct(vec![
               Field::new("b", DataType::Boolean, false),
               Field::new("c", DataType::Int32, false),].into()), false
       )
       ]));
       
       // Compute the parquet schema
       let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap();
       let props = Arc::new(WriterProperties::default());
       
       // Create writers for each of the leaf columns
       let col_writers = get_column_writers(&parquet_schema, &props, 
&schema).unwrap();
       
       // Spawn a worker thread for each column
       // This is for demonstration purposes, a thread-pool e.g. rayon or 
tokio, would be better
       let mut workers: Vec<_> = col_writers
           .into_iter()
           .map(|mut col_writer| {
               let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
               let handle = std::thread::spawn(move || {
                   for col in recv {
                       col_writer.write(&col)?;
                   }
                   col_writer.close()
               });
               (handle, send)
           })
           .collect();
       
       // Create parquet writer
       let root_schema = parquet_schema.root_schema_ptr();
       let mut out = Vec::with_capacity(1024); // This could be a File
       let mut writer = SerializedFileWriter::new(&mut out, root_schema, 
props.clone()).unwrap();
       
       // Start row group
       let mut row_group = writer.next_row_group().unwrap();
       
       let boolean = Arc::new(BooleanArray::from(vec![false, false, true, 
true]));
       let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31]));
       
       // Columns to encode
       let to_write = vec![Arc::new(
           StructArray::from(vec![
           (
               Arc::new(Field::new("b", DataType::Boolean, false)),
               boolean.clone() as ArrayRef,
           ),
           (
               Arc::new(Field::new("c", DataType::Int32, false)),
               int.clone() as ArrayRef,
           ),
       ])) as _,
       ];
       
       // Spawn work to encode columns
       let mut worker_iter = workers.iter_mut();
       for (arr, field) in to_write.iter().zip(&schema.fields) {
           for leaves in compute_leaves(field, arr).unwrap() {
               worker_iter.next().unwrap().1.send(leaves).unwrap();
           }
       }
       
       // Finish up parallel column encoding
       for (handle, send) in workers {
           drop(send); // Drop send side to signal termination
           let chunk = handle.join().unwrap().unwrap();
           chunk.append_to_row_group(&mut row_group).unwrap();
       }
       row_group.close().unwrap();
       
       let metadata = writer.close().unwrap();
       assert_eq!(metadata.num_rows, 4);
   }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to