This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 1eb62bd391 Unify API for writing column chunks / row groups in
parallel (#8582)
1eb62bd391 is described below
commit 1eb62bd39119a003d5b240108055e3a6697a1cfd
Author: Adam Reeve <[email protected]>
AuthorDate: Wed Oct 15 08:51:44 2025 +1300
Unify API for writing column chunks / row groups in parallel (#8582)
# Which issue does this PR close?
- Closes #8389.
# Rationale for this change
Simplify API surface and only provide one way to write column chunks and
row groups in parallel.
# What changes are included in this PR?
* Make `ArrowRowGroupWriterFactory` constructor public and simplify it
to remove arguments that are available from the `SerializedFileWriter`.
* Update `ArrowColumnWriter` example and test code to use the
`ArrowRowGroupWriterFactory`.
* Deprecate `parquet::arrow::arrow_writer::get_column_writers` and
`parquet::arrow::arrow_writer::ArrowWriter::into_serialized_writer`
# Are these changes tested?
Yes, covered by existing tests.
# Are there any user-facing changes?
Yes, this deprecates existing public methods.
---
parquet/benches/arrow_writer.rs | 14 ++-
parquet/src/arrow/arrow_writer/mod.rs | 130 ++++++++++++---------------
parquet/src/file/writer.rs | 6 ++
parquet/tests/encryption/encryption_async.rs | 36 +++++---
4 files changed, 96 insertions(+), 90 deletions(-)
diff --git a/parquet/benches/arrow_writer.rs b/parquet/benches/arrow_writer.rs
index 6053e02466..b92f0788b2 100644
--- a/parquet/benches/arrow_writer.rs
+++ b/parquet/benches/arrow_writer.rs
@@ -19,7 +19,7 @@
extern crate criterion;
use criterion::{Bencher, Criterion, Throughput};
-use parquet::arrow::arrow_writer::compute_leaves;
+use parquet::arrow::arrow_writer::{ArrowRowGroupWriterFactory, compute_leaves};
use parquet::basic::{Compression, ZstdLevel};
extern crate arrow;
@@ -33,8 +33,10 @@ use arrow::datatypes::*;
use arrow::util::bench_util::{create_f16_array, create_f32_array,
create_f64_array};
use arrow::{record_batch::RecordBatch, util::data_gen::*};
use arrow_array::RecordBatchOptions;
+use parquet::arrow::ArrowSchemaConverter;
+use parquet::errors::Result;
use parquet::file::properties::{WriterProperties, WriterVersion};
-use parquet::{arrow::ArrowWriter, errors::Result};
+use parquet::file::writer::SerializedFileWriter;
fn create_primitive_bench_batch(
size: usize,
@@ -341,8 +343,12 @@ fn write_batch_with_option(
props: Option<WriterProperties>,
) -> Result<()> {
let mut file = Empty::default();
- let writer = ArrowWriter::try_new(&mut file, batch.schema(), props)?;
- let (_, row_group_writer_factory) = writer.into_serialized_writer()?;
+ let props = Arc::new(props.unwrap_or_default());
+ let parquet_schema = ArrowSchemaConverter::new()
+ .with_coerce_types(props.coerce_types())
+ .convert(batch.schema_ref())?;
+ let writer = SerializedFileWriter::new(&mut file,
parquet_schema.root_schema_ptr(), props)?;
+ let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&writer,
batch.schema());
bench.iter(|| {
let mut row_group =
row_group_writer_factory.create_column_writers(0).unwrap();
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 66801d2d38..c2a7a6376f 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -48,7 +48,7 @@ use crate::file::properties::{WriterProperties,
WriterPropertiesPtr};
use crate::file::reader::{ChunkReader, Length};
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
-use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
use levels::{ArrayLevels, calculate_array_levels};
mod byte_array;
@@ -252,7 +252,7 @@ impl<W: Write + Send> ArrowWriter<W> {
SerializedFileWriter::new(writer, schema.root_schema_ptr(),
Arc::clone(&props_ptr))?;
let row_group_writer_factory =
- ArrowRowGroupWriterFactory::new(&file_writer, schema,
arrow_schema.clone(), props_ptr);
+ ArrowRowGroupWriterFactory::new(&file_writer,
arrow_schema.clone());
Ok(Self {
writer: file_writer,
@@ -423,7 +423,10 @@ impl<W: Write + Send> ArrowWriter<W> {
}
/// Create a new row group writer and return its column writers.
- #[deprecated(since = "56.2.0", note = "Use into_serialized_writer
instead")]
+ #[deprecated(
+ since = "56.2.0",
+ note = "Use `ArrowRowGroupWriterFactory` instead, see
`ArrowColumnWriter` for an example"
+ )]
pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
self.flush()?;
let in_progress = self
@@ -433,7 +436,10 @@ impl<W: Write + Send> ArrowWriter<W> {
}
/// Append the given column chunks to the file as a new row group.
- #[deprecated(since = "56.2.0", note = "Use into_serialized_writer
instead")]
+ #[deprecated(
+ since = "56.2.0",
+ note = "Use `SerializedFileWriter` directly instead, see
`ArrowColumnWriter` for an example"
+ )]
pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) ->
Result<()> {
let mut row_group_writer = self.writer.next_row_group()?;
for chunk in chunks {
@@ -445,6 +451,10 @@ impl<W: Write + Send> ArrowWriter<W> {
/// Converts this writer into a lower-level [`SerializedFileWriter`] and
[`ArrowRowGroupWriterFactory`].
/// This can be useful to provide more control over how files are written.
+ #[deprecated(
+ since = "57.0.0",
+ note = "Construct a `SerializedFileWriter` and
`ArrowRowGroupWriterFactory` directly instead"
+ )]
pub fn into_serialized_writer(
mut self,
) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
@@ -693,6 +703,8 @@ impl ArrowColumnChunk {
/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
///
+/// `ArrowColumnWriter` instances can be created using an
[`ArrowRowGroupWriterFactory`];
+///
/// Note: This is a low-level interface for applications that require
/// fine-grained control of encoding (e.g. encoding using multiple threads),
/// see [`ArrowWriter`] for a higher-level interface
@@ -704,7 +716,7 @@ impl ArrowColumnChunk {
/// # use arrow_array::*;
/// # use arrow_schema::*;
/// # use parquet::arrow::ArrowSchemaConverter;
-/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves,
get_column_writers, ArrowColumnChunk};
+/// # use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk,
ArrowLeafColumn, ArrowRowGroupWriterFactory};
/// # use parquet::file::properties::WriterProperties;
/// # use parquet::file::writer::{SerializedFileWriter,
SerializedRowGroupWriter};
/// #
@@ -720,8 +732,17 @@ impl ArrowColumnChunk {
/// .convert(&schema)
/// .unwrap();
///
-/// // Create writers for each of the leaf columns
-/// let col_writers = get_column_writers(&parquet_schema, &props,
&schema).unwrap();
+/// // Create parquet writer
+/// let root_schema = parquet_schema.root_schema_ptr();
+/// // write to memory in the example, but this could be a File
+/// let mut out = Vec::with_capacity(1024);
+/// let mut writer = SerializedFileWriter::new(&mut out, root_schema,
props.clone())
+/// .unwrap();
+///
+/// // Create a factory for building Arrow column writers
+/// let row_group_factory = ArrowRowGroupWriterFactory::new(&writer,
Arc::clone(&schema));
+/// // Create column writers for the 0th row group
+/// let col_writers = row_group_factory.create_column_writers(0).unwrap();
///
/// // Spawn a worker thread for each column
/// //
@@ -744,13 +765,6 @@ impl ArrowColumnChunk {
/// })
/// .collect();
///
-/// // Create parquet writer
-/// let root_schema = parquet_schema.root_schema_ptr();
-/// // write to memory in the example, but this could be a File
-/// let mut out = Vec::with_capacity(1024);
-/// let mut writer = SerializedFileWriter::new(&mut out, root_schema,
props.clone())
-/// .unwrap();
-///
/// // Start row group
/// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
/// .next_row_group()
@@ -894,7 +908,7 @@ impl ArrowRowGroupWriter {
/// Factory that creates new column writers for each row group in the Parquet
file.
pub struct ArrowRowGroupWriterFactory {
- schema: SchemaDescriptor,
+ schema: SchemaDescPtr,
arrow_schema: SchemaRef,
props: WriterPropertiesPtr,
#[cfg(feature = "encryption")]
@@ -902,61 +916,57 @@ pub struct ArrowRowGroupWriterFactory {
}
impl ArrowRowGroupWriterFactory {
- #[cfg(feature = "encryption")]
- fn new<W: Write + Send>(
+ /// Create a new [`ArrowRowGroupWriterFactory`] for the provided file
writer and Arrow schema
+ pub fn new<W: Write + Send>(
file_writer: &SerializedFileWriter<W>,
- schema: SchemaDescriptor,
arrow_schema: SchemaRef,
- props: WriterPropertiesPtr,
) -> Self {
+ let schema = Arc::clone(file_writer.schema_descr_ptr());
+ let props = Arc::clone(file_writer.properties());
Self {
schema,
arrow_schema,
props,
+ #[cfg(feature = "encryption")]
file_encryptor: file_writer.file_encryptor(),
}
}
- #[cfg(not(feature = "encryption"))]
- fn new<W: Write + Send>(
- _file_writer: &SerializedFileWriter<W>,
- schema: SchemaDescriptor,
- arrow_schema: SchemaRef,
- props: WriterPropertiesPtr,
- ) -> Self {
- Self {
- schema,
- arrow_schema,
- props,
+ fn create_row_group_writer(&self, row_group_index: usize) ->
Result<ArrowRowGroupWriter> {
+ let writers = self.create_column_writers(row_group_index)?;
+ Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
+ }
+
+ /// Create column writers for a new row group.
+ pub fn create_column_writers(&self, row_group_index: usize) ->
Result<Vec<ArrowColumnWriter>> {
+ let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
+ let mut leaves = self.schema.columns().iter();
+ let column_factory = self.column_writer_factory(row_group_index);
+ for field in &self.arrow_schema.fields {
+ column_factory.get_arrow_column_writer(
+ field.data_type(),
+ &self.props,
+ &mut leaves,
+ &mut writers,
+ )?;
}
+ Ok(writers)
}
#[cfg(feature = "encryption")]
- fn create_row_group_writer(&self, row_group_index: usize) ->
Result<ArrowRowGroupWriter> {
- let writers = get_column_writers_with_encryptor(
- &self.schema,
- &self.props,
- &self.arrow_schema,
- self.file_encryptor.clone(),
- row_group_index,
- )?;
- Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
+ fn column_writer_factory(&self, row_group_idx: usize) ->
ArrowColumnWriterFactory {
+ ArrowColumnWriterFactory::new()
+ .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
}
#[cfg(not(feature = "encryption"))]
- fn create_row_group_writer(&self, _row_group_index: usize) ->
Result<ArrowRowGroupWriter> {
- let writers = get_column_writers(&self.schema, &self.props,
&self.arrow_schema)?;
- Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
- }
-
- /// Create column writers for a new row group.
- pub fn create_column_writers(&self, row_group_index: usize) ->
Result<Vec<ArrowColumnWriter>> {
- let rg_writer = self.create_row_group_writer(row_group_index)?;
- Ok(rg_writer.writers)
+ fn column_writer_factory(&self, _row_group_idx: usize) ->
ArrowColumnWriterFactory {
+ ArrowColumnWriterFactory::new()
}
}
/// Returns [`ArrowColumnWriter`]s for each column in a given schema
+#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory`
instead")]
pub fn get_column_writers(
parquet: &SchemaDescriptor,
props: &WriterPropertiesPtr,
@@ -976,30 +986,6 @@ pub fn get_column_writers(
Ok(writers)
}
-/// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar
encryption
-#[cfg(feature = "encryption")]
-fn get_column_writers_with_encryptor(
- parquet: &SchemaDescriptor,
- props: &WriterPropertiesPtr,
- arrow: &SchemaRef,
- file_encryptor: Option<Arc<FileEncryptor>>,
- row_group_index: usize,
-) -> Result<Vec<ArrowColumnWriter>> {
- let mut writers = Vec::with_capacity(arrow.fields.len());
- let mut leaves = parquet.columns().iter();
- let column_factory =
- ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index,
file_encryptor);
- for field in &arrow.fields {
- column_factory.get_arrow_column_writer(
- field.data_type(),
- props,
- &mut leaves,
- &mut writers,
- )?;
- }
- Ok(writers)
-}
-
/// Creates [`ArrowColumnWriter`] instances
struct ArrowColumnWriterFactory {
#[cfg(feature = "encryption")]
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 4533d25401..a77ce266d1 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -392,6 +392,12 @@ impl<W: Write + Send> SerializedFileWriter<W> {
&self.descr
}
+ /// Returns a reference to schema descriptor Arc.
+ #[cfg(feature = "arrow")]
+ pub(crate) fn schema_descr_ptr(&self) -> &SchemaDescPtr {
+ &self.descr
+ }
+
/// Returns a reference to the writer properties
pub fn properties(&self) -> &WriterPropertiesPtr {
&self.props
diff --git a/parquet/tests/encryption/encryption_async.rs
b/parquet/tests/encryption/encryption_async.rs
index 6da3d2d11e..ccbb2b0bff 100644
--- a/parquet/tests/encryption/encryption_async.rs
+++ b/parquet/tests/encryption/encryption_async.rs
@@ -24,13 +24,14 @@ use crate::encryption_util::{
use arrow_array::RecordBatch;
use arrow_schema::Schema;
use futures::TryStreamExt;
-use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::arrow_writer::{
ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn,
ArrowRowGroupWriterFactory,
ArrowWriterOptions, compute_leaves,
};
-use parquet::arrow::{ArrowWriter, AsyncArrowWriter};
+use parquet::arrow::{
+ ArrowSchemaConverter, ArrowWriter, AsyncArrowWriter,
ParquetRecordBatchStreamBuilder,
+};
use parquet::encryption::decrypt::FileDecryptionProperties;
use parquet::encryption::encrypt::FileEncryptionProperties;
use parquet::errors::ParquetError;
@@ -696,18 +697,22 @@ async fn
test_concurrent_encrypted_writing_over_multiple_row_groups() {
}
});
- let props = Some(
+ let props = Arc::new(
WriterPropertiesBuilder::default()
.with_file_encryption_properties(file_encryption_properties)
.build(),
);
+ let parquet_schema = ArrowSchemaConverter::new()
+ .with_coerce_types(props.coerce_types())
+ .convert(schema)
+ .unwrap();
// Create a temporary file to write the encrypted data
let temp_file = tempfile::tempfile().unwrap();
- let arrow_writer =
- ArrowWriter::try_new(&temp_file, metadata.schema().clone(),
props.clone()).unwrap();
- let (writer, row_group_writer_factory) =
arrow_writer.into_serialized_writer().unwrap();
+ let writer =
+ SerializedFileWriter::new(&temp_file,
parquet_schema.root_schema_ptr(), props).unwrap();
+ let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&writer,
Arc::clone(schema));
let max_row_groups = 1;
let (serialize_tx, serialize_rx) =
@@ -757,19 +762,22 @@ async fn test_multi_threaded_encrypted_writing() {
read_encrypted_file(&file, decryption_properties.clone()).unwrap();
let schema = metadata.schema().clone();
- let props = Some(
+ let props = Arc::new(
WriterPropertiesBuilder::default()
.with_file_encryption_properties(file_encryption_properties)
.build(),
);
+ let parquet_schema = ArrowSchemaConverter::new()
+ .with_coerce_types(props.coerce_types())
+ .convert(&schema)
+ .unwrap();
+
// Create a temporary file to write the encrypted data
let temp_file = tempfile::tempfile().unwrap();
- let writer =
- ArrowWriter::try_new(&temp_file, metadata.schema().clone(),
props.clone()).unwrap();
-
- let (mut serialized_file_writer, row_group_writer_factory) =
- writer.into_serialized_writer().unwrap();
+ let mut writer =
+ SerializedFileWriter::new(&temp_file,
parquet_schema.root_schema_ptr(), props).unwrap();
+ let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&writer,
Arc::clone(&schema));
let (serialize_tx, mut serialize_rx) =
tokio::sync::mpsc::channel::<JoinHandle<RBStreamSerializeResult>>(1);
@@ -805,7 +813,7 @@ async fn test_multi_threaded_encrypted_writing() {
// Append the finalized row groups to the SerializedFileWriter
while let Some(task) = serialize_rx.recv().await {
let (arrow_column_chunks, _) = task.await.unwrap().unwrap();
- let mut row_group_writer =
serialized_file_writer.next_row_group().unwrap();
+ let mut row_group_writer = writer.next_row_group().unwrap();
for chunk in arrow_column_chunks {
chunk.append_to_row_group(&mut row_group_writer).unwrap();
}
@@ -815,7 +823,7 @@ async fn test_multi_threaded_encrypted_writing() {
// Wait for data generator and serialization task to finish
data_generator.await.unwrap();
launch_serialization_task.await.unwrap();
- let metadata = serialized_file_writer.close().unwrap();
+ let metadata = writer.close().unwrap();
// Close the file writer which writes the footer
assert_eq!(metadata.file_metadata().num_rows(), 50);