tustvold commented on code in PR #4871:
URL: https://github.com/apache/arrow-rs/pull/4871#discussion_r1341110187


##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -347,92 +349,213 @@ impl PageWriter for ArrowPageWriter {
     }
 }
 
-/// Encodes a leaf column to [`ArrowPageWriter`]
-enum ArrowColumnWriter {
+/// A leaf column that can be encoded by [`ArrowColumnWriter`]
+pub struct ArrowLeafColumn(ArrayLevels);
+
+/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
+pub fn compute_leaves(field: &Field, array: &ArrayRef) -> 
Result<Vec<ArrowLeafColumn>> {
+    let levels = calculate_array_levels(array, field)?;
+    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
+}
+
+/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
+///
+/// Note: This is a low-level interface for applications that require 
fine-grained control
+/// of encoding, see [`ArrowWriter`] for a higher-level interface
+///
+/// ```
+/// // The arrow schema
+/// # use std::sync::Arc;
+/// # use arrow_array::*;
+/// # use arrow_schema::*;
+/// # use parquet::arrow::arrow_to_parquet_schema;
+/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, 
get_column_writers};
+/// # use parquet::file::properties::WriterProperties;
+/// # use parquet::file::writer::SerializedFileWriter;
+/// #
+/// let schema = Arc::new(Schema::new(vec![
+///     Field::new("i32", DataType::Int32, false),
+///     Field::new("f32", DataType::Float32, false),
+/// ]));
+///
+/// // Compute the parquet schema
+/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap();
+/// let props = Arc::new(WriterProperties::default());
+///
+/// // Create writers for each of the leaf columns
+/// let col_writers = get_column_writers(&parquet_schema, &props, 
&schema).unwrap();
+///
+/// // Spawn a worker thread for each column
+/// // This is for demonstration purposes, a thread-pool e.g. rayon or tokio, 
would be better
+/// let mut workers: Vec<_> = col_writers
+///     .into_iter()
+///     .map(|mut col_writer| {
+///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
+///         let handle = std::thread::spawn(move || {
+///             for col in recv {
+///                 col_writer.write(&col)?;
+///             }
+///             col_writer.close()
+///         });
+///         (handle, send)
+///     })
+///     .collect();
+///
+/// // Create parquet writer
+/// let root_schema = parquet_schema.root_schema_ptr();
+/// let mut out = Vec::with_capacity(1024); // This could be a File
+/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, 
props.clone()).unwrap();
+///
+/// // Start row group
+/// let mut row_group = writer.next_row_group().unwrap();
+///
+/// // Columns to encode
+/// let to_write = vec![
+///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
+///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
+/// ];
+///
+/// // Spawn work to encode columns
+/// let mut worker_iter = workers.iter_mut();
+/// for (a, f) in to_write.iter().zip(&schema.fields) {
+///     for c in compute_leaves(f, a).unwrap() {
+///         worker_iter.next().unwrap().1.send(c).unwrap();
+///     }
+/// }
+///
+/// // Finish up parallel column encoding
+/// for (handle, send) in workers {
+///     drop(send); // Drop send side to signal termination
+///     let (chunk, result) = handle.join().unwrap().unwrap();
+///     row_group.append_column(&chunk, result).unwrap();
+/// }
+/// row_group.close().unwrap();
+///
+/// let metadata = writer.close().unwrap();
+/// assert_eq!(metadata.num_rows, 3);
+/// ```
+pub struct ArrowColumnWriter {
+    writer: ArrowColumnWriterImpl,
+    chunk: SharedColumnChunk,
+}
+
+enum ArrowColumnWriterImpl {
     ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
     Column(ColumnWriter<'static>),
 }
 
 impl ArrowColumnWriter {
+    /// Write an [`ArrowLeafColumn`]
+    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
+        match &mut self.writer {
+            ArrowColumnWriterImpl::Column(c) => {
+                write_leaf(c, &col.0)?;
+            }
+            ArrowColumnWriterImpl::ByteArray(c) => {
+                write_primitive(c, col.0.array().as_ref(), &col.0)?;
+            }
+        }
+        Ok(())
+    }
+
+    /// Close this column returning the [`ArrowColumnChunk`] and 
[`ColumnCloseResult`]
+    pub fn close(self) -> Result<(ArrowColumnChunk, ColumnCloseResult)> {

Review Comment:
   I tried to address this in 
https://github.com/apache/arrow-rs/pull/4871/commits/a4c17a902c11c4371eef4bd95d73a7bc8f951238
 PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to