tustvold commented on code in PR #4871:
URL: https://github.com/apache/arrow-rs/pull/4871#discussion_r1340670691


##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -347,92 +349,213 @@ impl PageWriter for ArrowPageWriter {
     }
 }
 
-/// Encodes a leaf column to [`ArrowPageWriter`]
-enum ArrowColumnWriter {
+/// A leaf column that can be encoded by [`ArrowColumnWriter`]
+pub struct ArrowLeafColumn(ArrayLevels);
+
+/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`]
+pub fn compute_leaves(field: &Field, array: &ArrayRef) -> 
Result<Vec<ArrowLeafColumn>> {
+    let levels = calculate_array_levels(array, field)?;
+    Ok(levels.into_iter().map(ArrowLeafColumn).collect())
+}
+
+/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
+///
+/// Note: This is a low-level interface for applications that require 
fine-grained control
+/// of encoding, see [`ArrowWriter`] for a higher-level interface
+///
+/// ```
+/// // The arrow schema
+/// # use std::sync::Arc;
+/// # use arrow_array::*;
+/// # use arrow_schema::*;
+/// # use parquet::arrow::arrow_to_parquet_schema;
+/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, 
get_column_writers};
+/// # use parquet::file::properties::WriterProperties;
+/// # use parquet::file::writer::SerializedFileWriter;
+/// #
+/// let schema = Arc::new(Schema::new(vec![
+///     Field::new("i32", DataType::Int32, false),
+///     Field::new("f32", DataType::Float32, false),
+/// ]));
+///
+/// // Compute the parquet schema
+/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap();
+/// let props = Arc::new(WriterProperties::default());
+///
+/// // Create writers for each of the leaf columns
+/// let col_writers = get_column_writers(&parquet_schema, &props, 
&schema).unwrap();
+///
+/// // Spawn a worker thread for each column
+/// // This is for demonstration purposes, a thread-pool e.g. rayon or tokio, 
would be better
+/// let mut workers: Vec<_> = col_writers
+///     .into_iter()
+///     .map(|mut col_writer| {
+///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
+///         let handle = std::thread::spawn(move || {
+///             for col in recv {
+///                 col_writer.write(&col)?;
+///             }
+///             col_writer.close()
+///         });
+///         (handle, send)
+///     })
+///     .collect();
+///
+/// // Create parquet writer
+/// let root_schema = parquet_schema.root_schema_ptr();
+/// let mut out = Vec::with_capacity(1024); // This could be a File
+/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, 
props.clone()).unwrap();
+///
+/// // Start row group
+/// let mut row_group = writer.next_row_group().unwrap();
+///
+/// // Columns to encode
+/// let to_write = vec![
+///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
+///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
+/// ];
+///
+/// // Spawn work to encode columns
+/// let mut worker_iter = workers.iter_mut();
+/// for (a, f) in to_write.iter().zip(&schema.fields) {
+///     for c in compute_leaves(f, a).unwrap() {
+///         worker_iter.next().unwrap().1.send(c).unwrap();
+///     }
+/// }
+///
+/// // Finish up parallel column encoding
+/// for (handle, send) in workers {
+///     drop(send); // Drop send side to signal termination
+///     let (chunk, result) = handle.join().unwrap().unwrap();
+///     row_group.append_column(&chunk, result).unwrap();
+/// }
+/// row_group.close().unwrap();
+///
+/// let metadata = writer.close().unwrap();
+/// assert_eq!(metadata.num_rows, 3);
+/// ```
+pub struct ArrowColumnWriter {
+    writer: ArrowColumnWriterImpl,
+    chunk: SharedColumnChunk,
+}
+
+enum ArrowColumnWriterImpl {
     ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>),
     Column(ColumnWriter<'static>),
 }
 
 impl ArrowColumnWriter {
+    /// Write an [`ArrowLeafColumn`]
+    pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> {
+        match &mut self.writer {
+            ArrowColumnWriterImpl::Column(c) => {
+                write_leaf(c, &col.0)?;
+            }
+            ArrowColumnWriterImpl::ByteArray(c) => {
+                write_primitive(c, col.0.array().as_ref(), &col.0)?;
+            }
+        }
+        Ok(())
+    }
+
+    /// Close this column returning the [`ArrowColumnChunk`] and 
[`ColumnCloseResult`]
+    pub fn close(self) -> Result<(ArrowColumnChunk, ColumnCloseResult)> {

Review Comment:
   This wouldn't be usable with append_column unless you first exploded it into 
parts, at which point...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to