This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new ce589325d8 parquet: Add ArrowWriterOptions to skip embedding the arrow 
metadata (#5299)
ce589325d8 is described below

commit ce589325d87b7ba55d3af74ad77f7cee9ff3c95b
Author: Yingwen <[email protected]>
AuthorDate: Mon Jan 22 18:59:56 2024 +0800

    parquet: Add ArrowWriterOptions to skip embedding the arrow metadata (#5299)
    
    * feat(parquet): Add ArrowWriterOptions
    
    * test(parquet): test skip_arrow_metadata
    
    * feat(parquet): Add try_new_with_options to async writer
    
    * refactor: move WriterProperties to ArrowWriterOptions
---
 parquet/src/arrow/arrow_writer/mod.rs | 87 +++++++++++++++++++++++++++++++++--
 parquet/src/arrow/async_writer/mod.rs | 23 ++++++++-
 2 files changed, 106 insertions(+), 4 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index 3563348791..d9771838ad 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -119,11 +119,27 @@ impl<W: Write + Send> ArrowWriter<W> {
         writer: W,
         arrow_schema: SchemaRef,
         props: Option<WriterProperties>,
+    ) -> Result<Self> {
+        let options = 
ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
+        Self::try_new_with_options(writer, arrow_schema, options)
+    }
+
+    /// Try to create a new Arrow writer with [`ArrowWriterOptions`].
+    ///
+    /// The writer will fail if:
+    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
+    ///  * the Arrow schema contains unsupported datatypes such as Unions
+    pub fn try_new_with_options(
+        writer: W,
+        arrow_schema: SchemaRef,
+        options: ArrowWriterOptions,
     ) -> Result<Self> {
         let schema = arrow_to_parquet_schema(&arrow_schema)?;
-        // add serialized arrow schema
-        let mut props = props.unwrap_or_default();
-        add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
+        let mut props = options.properties;
+        if !options.skip_arrow_metadata {
+            // add serialized arrow schema
+            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
+        }
 
         let max_row_group_size = props.max_row_group_size();
 
@@ -245,6 +261,38 @@ impl<W: Write + Send> RecordBatchWriter for ArrowWriter<W> 
{
     }
 }
 
+/// Arrow-specific configuration settings for writing parquet files.
+///
+/// See [`ArrowWriter`] for how to configure the writer.
+#[derive(Debug, Clone, Default)]
+pub struct ArrowWriterOptions {
+    properties: WriterProperties,
+    skip_arrow_metadata: bool,
+}
+
+impl ArrowWriterOptions {
+    /// Creates a new [`ArrowWriterOptions`] with the default settings.
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Sets the [`WriterProperties`] for writing parquet files.
+    pub fn with_properties(self, properties: WriterProperties) -> Self {
+        Self { properties, ..self }
+    }
+
+    /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow 
schema
+    /// by default.
+    ///
+    /// Set `skip_arrow_metadata` to true, to skip encoding this.
+    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
+        Self {
+            skip_arrow_metadata,
+            ..self
+        }
+    }
+}
+
 /// A single column chunk produced by [`ArrowColumnWriter`]
 #[derive(Default)]
 struct ArrowColumnChunkData {
@@ -904,6 +952,7 @@ mod tests {
     use std::sync::Arc;
 
     use crate::arrow::arrow_reader::{ParquetRecordBatchReader, 
ParquetRecordBatchReaderBuilder};
+    use crate::arrow::ARROW_SCHEMA_META_KEY;
     use arrow::datatypes::ToByteSlice;
     use arrow::datatypes::{DataType, Field, Schema, UInt32Type, UInt8Type};
     use arrow::error::Result as ArrowResult;
@@ -2882,4 +2931,36 @@ mod tests {
         let b_idx = &column_index[0][1];
         assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
     }
+
+    #[test]
+    fn test_arrow_writer_skip_metadata() {
+        let batch_schema = Schema::new(vec![Field::new("int32", 
DataType::Int32, false)]);
+        let file_schema = Arc::new(batch_schema.clone());
+
+        let batch = RecordBatch::try_new(
+            Arc::new(batch_schema),
+            vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
+        )
+        .unwrap();
+        let skip_options = 
ArrowWriterOptions::new().with_skip_arrow_metadata(true);
+
+        let mut buf = Vec::with_capacity(1024);
+        let mut writer =
+            ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), 
skip_options).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+
+        let bytes = Bytes::from(buf);
+        let reader_builder = 
ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
+        assert_eq!(file_schema, *reader_builder.schema());
+        if let Some(key_value_metadata) = reader_builder
+            .metadata()
+            .file_metadata()
+            .key_value_metadata()
+        {
+            assert!(!key_value_metadata
+                .iter()
+                .any(|kv| kv.key.as_str() == ARROW_SCHEMA_META_KEY));
+        }
+    }
 }
diff --git a/parquet/src/arrow/async_writer/mod.rs 
b/parquet/src/arrow/async_writer/mod.rs
index 30080c579e..3f3da1a5f9 100644
--- a/parquet/src/arrow/async_writer/mod.rs
+++ b/parquet/src/arrow/async_writer/mod.rs
@@ -54,6 +54,7 @@
 use std::{io::Write, sync::Arc};
 
 use crate::{
+    arrow::arrow_writer::ArrowWriterOptions,
     arrow::ArrowWriter,
     errors::{ParquetError, Result},
     file::properties::WriterProperties,
@@ -97,9 +98,29 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
         arrow_schema: SchemaRef,
         buffer_size: usize,
         props: Option<WriterProperties>,
+    ) -> Result<Self> {
+        let options = 
ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
+        Self::try_new_with_options(writer, arrow_schema, buffer_size, options)
+    }
+
+    /// Try to create a new Async Arrow Writer with [`ArrowWriterOptions`].
+    ///
+    /// `buffer_size` determines the number of bytes to buffer before flushing
+    /// to the underlying [`AsyncWrite`]
+    ///
+    /// The intermediate buffer will automatically be resized if necessary
+    ///
+    /// [`Self::write`] will flush this intermediate buffer if it is at least
+    /// half full
+    pub fn try_new_with_options(
+        writer: W,
+        arrow_schema: SchemaRef,
+        buffer_size: usize,
+        options: ArrowWriterOptions,
     ) -> Result<Self> {
         let shared_buffer = SharedBuffer::new(buffer_size);
-        let sync_writer = ArrowWriter::try_new(shared_buffer.clone(), 
arrow_schema, props)?;
+        let sync_writer =
+            ArrowWriter::try_new_with_options(shared_buffer.clone(), 
arrow_schema, options)?;
 
         Ok(Self {
             sync_writer,

Reply via email to