This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new ac759d5692 Minor: Improve parallel parquet encoding example (#7323)
ac759d5692 is described below

commit ac759d56924270c05907036e14f9383f70a417f3
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Mar 26 12:45:11 2025 -0400

    Minor: Improve parallel parquet encoding example (#7323)
    
    * Minor: Improve parallel parquet encoding example
    
    * Apply suggestions from code review
    
    Co-authored-by: Ed Seidl <[email protected]>
    
    ---------
    
    Co-authored-by: Ed Seidl <[email protected]>
---
 parquet/src/arrow/arrow_writer/mod.rs | 41 +++++++++++++++++++++++------------
 1 file changed, 27 insertions(+), 14 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index fb441f4d33..e0392f65d4 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -530,8 +530,9 @@ impl ArrowColumnChunk {
 
 /// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
 ///
-/// Note: This is a low-level interface for applications that require 
fine-grained control
-/// of encoding, see [`ArrowWriter`] for a higher-level interface
+/// Note: This is a low-level interface for applications that require
+/// fine-grained control of encoding (e.g. encoding using multiple threads),
+/// see [`ArrowWriter`] for a higher-level interface
 ///
 /// # Example: Encoding two Arrow Array's in Parallel
 /// ```
@@ -540,9 +541,9 @@ impl ArrowColumnChunk {
 /// # use arrow_array::*;
 /// # use arrow_schema::*;
 /// # use parquet::arrow::ArrowSchemaConverter;
-/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, 
get_column_writers};
+/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, 
get_column_writers, ArrowColumnChunk};
 /// # use parquet::file::properties::WriterProperties;
-/// # use parquet::file::writer::SerializedFileWriter;
+/// # use parquet::file::writer::{SerializedFileWriter, 
SerializedRowGroupWriter};
 /// #
 /// let schema = Arc::new(Schema::new(vec![
 ///     Field::new("i32", DataType::Int32, false),
@@ -560,15 +561,20 @@ impl ArrowColumnChunk {
 /// let col_writers = get_column_writers(&parquet_schema, &props, 
&schema).unwrap();
 ///
 /// // Spawn a worker thread for each column
-/// // This is for demonstration purposes, a thread-pool e.g. rayon or tokio, 
would be better
+/// //
+/// // Note: This is for demonstration purposes, a thread-pool e.g. rayon or 
tokio, would be better.
+/// // The `map` produces an iterator of type `tuple of (thread handle, send 
channel)`.
 /// let mut workers: Vec<_> = col_writers
 ///     .into_iter()
 ///     .map(|mut col_writer| {
 ///         let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
 ///         let handle = std::thread::spawn(move || {
+///             // receive Arrays to encode via the channel
 ///             for col in recv {
 ///                 col_writer.write(&col)?;
 ///             }
+///             // once the input is complete, close the writer
+///             // to return the newly created ArrowColumnChunk
 ///             col_writer.close()
 ///         });
 ///         (handle, send)
@@ -577,19 +583,23 @@ impl ArrowColumnChunk {
 ///
 /// // Create parquet writer
 /// let root_schema = parquet_schema.root_schema_ptr();
-/// let mut out = Vec::with_capacity(1024); // This could be a File
-/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, 
props.clone()).unwrap();
+/// // write to memory in the example, but this could be a File
+/// let mut out = Vec::with_capacity(1024);
+/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, 
props.clone())
+///   .unwrap();
 ///
 /// // Start row group
-/// let mut row_group = writer.next_row_group().unwrap();
+/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
+///   .next_row_group()
+///   .unwrap();
 ///
-/// // Columns to encode
+/// // Create some example input columns to encode
 /// let to_write = vec![
 ///     Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _,
 ///     Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _,
 /// ];
 ///
-/// // Spawn work to encode columns
+/// // Send the input columns to the workers
 /// let mut worker_iter = workers.iter_mut();
 /// for (arr, field) in to_write.iter().zip(&schema.fields) {
 ///     for leaves in compute_leaves(field, arr).unwrap() {
@@ -597,13 +607,16 @@ impl ArrowColumnChunk {
 ///     }
 /// }
 ///
-/// // Finish up parallel column encoding
+/// // Wait for the workers to complete encoding, and append
+/// // the resulting column chunks to the row group (and the file)
 /// for (handle, send) in workers {
 ///     drop(send); // Drop send side to signal termination
-///     let chunk = handle.join().unwrap().unwrap();
-///     chunk.append_to_row_group(&mut row_group).unwrap();
+///     // wait for the worker to send the completed chunk
+///     let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
+///     chunk.append_to_row_group(&mut row_group_writer).unwrap();
 /// }
-/// row_group.close().unwrap();
+/// // Close the row group which writes to the underlying file
+/// row_group_writer.close().unwrap();
 ///
 /// let metadata = writer.close().unwrap();
 /// assert_eq!(metadata.num_rows, 3);

Reply via email to