This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new ce589325d8 parquet: Add ArrowWriterOptions to skip embedding the arrow
metadata (#5299)
ce589325d8 is described below
commit ce589325d87b7ba55d3af74ad77f7cee9ff3c95b
Author: Yingwen <[email protected]>
AuthorDate: Mon Jan 22 18:59:56 2024 +0800
parquet: Add ArrowWriterOptions to skip embedding the arrow metadata (#5299)
* feat(parquet): Add ArrowWriterOptions
* test(parquet): test skip_arrow_metadata
* feat(parquet): Add try_new_with_options to async writer
* refactor: move WriterProperties to ArrowWriterOptions
---
parquet/src/arrow/arrow_writer/mod.rs | 87 +++++++++++++++++++++++++++++++++--
parquet/src/arrow/async_writer/mod.rs | 23 ++++++++-
2 files changed, 106 insertions(+), 4 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 3563348791..d9771838ad 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -119,11 +119,27 @@ impl<W: Write + Send> ArrowWriter<W> {
writer: W,
arrow_schema: SchemaRef,
props: Option<WriterProperties>,
+ ) -> Result<Self> {
+ let options =
ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
+ Self::try_new_with_options(writer, arrow_schema, options)
+ }
+
+ /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
+ ///
+ /// The writer will fail if:
+ /// * a `SerializedFileWriter` cannot be created from the ParquetWriter
+ /// * the Arrow schema contains unsupported datatypes such as Unions
+ pub fn try_new_with_options(
+ writer: W,
+ arrow_schema: SchemaRef,
+ options: ArrowWriterOptions,
) -> Result<Self> {
let schema = arrow_to_parquet_schema(&arrow_schema)?;
- // add serialized arrow schema
- let mut props = props.unwrap_or_default();
- add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
+ let mut props = options.properties;
+ if !options.skip_arrow_metadata {
+ // add serialized arrow schema
+ add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
+ }
let max_row_group_size = props.max_row_group_size();
@@ -245,6 +261,38 @@ impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W>
{
}
}
+/// Arrow-specific configuration settings for writing parquet files.
+///
+/// See [`ArrowWriter`] for how to configure the writer.
+#[derive(Debug, Clone, Default)]
+pub struct ArrowWriterOptions {
+ properties: WriterProperties,
+ skip_arrow_metadata: bool,
+}
+
+impl ArrowWriterOptions {
+ /// Creates a new [`ArrowWriterOptions`] with the default settings.
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Sets the [`WriterProperties`] for writing parquet files.
+ pub fn with_properties(self, properties: WriterProperties) -> Self {
+ Self { properties, ..self }
+ }
+
+ /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow
schema
+ /// by default.
+ ///
+ /// Set `skip_arrow_metadata` to true, to skip encoding this.
+ pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
+ Self {
+ skip_arrow_metadata,
+ ..self
+ }
+ }
+}
+
/// A single column chunk produced by [`ArrowColumnWriter`]
#[derive(Default)]
struct ArrowColumnChunkData {
@@ -904,6 +952,7 @@ mod tests {
use std::sync::Arc;
use crate::arrow::arrow_reader::{ParquetRecordBatchReader,
ParquetRecordBatchReaderBuilder};
+ use crate::arrow::ARROW_SCHEMA_META_KEY;
use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type};
use arrow::error::Result as ArrowResult;
@@ -2882,4 +2931,36 @@ mod tests {
let b_idx = &column_index[0][1];
assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
}
+
+ #[test]
+ fn test_arrow_writer_skip_metadata() {
+ let batch_schema = Schema::new(vec![Field::new("int32",
DataType::Int32, false)]);
+ let file_schema = Arc::new(batch_schema.clone());
+
+ let batch = RecordBatch::try_new(
+ Arc::new(batch_schema),
+ vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
+ )
+ .unwrap();
+ let skip_options =
ArrowWriterOptions::new().with_skip_arrow_metadata(true);
+
+ let mut buf = Vec::with_capacity(1024);
+ let mut writer =
+ ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(),
skip_options).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let bytes = Bytes::from(buf);
+ let reader_builder =
ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
+ assert_eq!(file_schema, *reader_builder.schema());
+ if let Some(key_value_metadata) = reader_builder
+ .metadata()
+ .file_metadata()
+ .key_value_metadata()
+ {
+ assert!(!key_value_metadata
+ .iter()
+ .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
+ }
+ }
}
diff --git a/parquet/src/arrow/async_writer/mod.rs
b/parquet/src/arrow/async_writer/mod.rs
index 30080c579e..3f3da1a5f9 100644
--- a/parquet/src/arrow/async_writer/mod.rs
+++ b/parquet/src/arrow/async_writer/mod.rs
@@ -54,6 +54,7 @@
use std::{io::Write, sync::Arc};
use crate::{
+ arrow::arrow_writer::ArrowWriterOptions,
arrow::ArrowWriter,
errors::{ParquetError, Result},
file::properties::WriterProperties,
@@ -97,9 +98,29 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
arrow_schema: SchemaRef,
buffer_size: usize,
props: Option<WriterProperties>,
+ ) -> Result<Self> {
+ let options =
ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
+ Self::try_new_with_options(writer, arrow_schema, buffer_size, options)
+ }
+
+ /// Try to create a new Async Arrow Writer with [`ArrowWriterOptions`].
+ ///
+ /// `buffer_size` determines the number of bytes to buffer before flushing
+ /// to the underlying [`AsyncWrite`]
+ ///
+ /// The intermediate buffer will automatically be resized if necessary
+ ///
+ /// [`Self::write`] will flush this intermediate buffer if it is at least
+ /// half full
+ pub fn try_new_with_options(
+ writer: W,
+ arrow_schema: SchemaRef,
+ buffer_size: usize,
+ options: ArrowWriterOptions,
) -> Result<Self> {
let shared_buffer = SharedBuffer::new(buffer_size);
- let sync_writer = ArrowWriter::try_new(shared_buffer.clone(),
arrow_schema, props)?;
+ let sync_writer =
+ ArrowWriter::try_new_with_options(shared_buffer.clone(),
arrow_schema, options)?;
Ok(Self {
sync_writer,