This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new f521e11dc feat: add append_key_value_metadata (#3367)
f521e11dc is described below

commit f521e11dc2da0313cb1a16958d39a67a41a25fa6
Author: Jiacai Liu <[email protected]>
AuthorDate: Wed Dec 21 01:01:12 2022 +0800

    feat: add append_key_value_metadata (#3367)
    
    * Add update_key_value_metadata
    
    * Add comments
    
    * Address review
    
    * fix clippy
    
    * Update parquet/src/arrow/arrow_writer/mod.rs
    
    Co-authored-by: Raphael Taylor-Davies 
<[email protected]>
    
    * Fix reviews
    
    * Test and fix
    
    Co-authored-by: Raphael Taylor-Davies 
<[email protected]>
    Co-authored-by: Raphael Taylor-Davies <[email protected]>
---
 parquet/src/arrow/arrow_writer/mod.rs |  9 +++-
 parquet/src/file/writer.rs            | 80 ++++++++++++++++++++++++++++++++++-
 2 files changed, 87 insertions(+), 2 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index 53ca71d28..5cf33d125 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -32,7 +32,7 @@ use super::schema::{
 use crate::arrow::arrow_writer::byte_array::ByteArrayWriter;
 use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
 use crate::errors::{ParquetError, Result};
-use crate::file::metadata::RowGroupMetaDataPtr;
+use crate::file::metadata::{KeyValue, RowGroupMetaDataPtr};
 use crate::file::properties::WriterProperties;
 use crate::file::writer::SerializedRowGroupWriter;
 use crate::{data_type::*, file::writer::SerializedFileWriter};
@@ -158,6 +158,13 @@ impl<W: Write> ArrowWriter<W> {
         self.flush_rows(self.buffered_rows)
     }
 
+    /// Additional [`KeyValue`] metadata to be written in addition to those 
from [`WriterProperties`]
+    ///
+    /// This method provide a way to append kv_metadata after write RecordBatch
+    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
+        self.writer.append_key_value_metadata(kv_metadata)
+    }
+
     /// Flushes `num_rows` from the buffer into a new row group
     fn flush_rows(&mut self, num_rows: usize) -> Result<()> {
         if num_rows == 0 {
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index a12d5477c..2d879be80 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -142,6 +142,8 @@ pub struct SerializedFileWriter<W: Write> {
     column_indexes: Vec<Vec<Option<ColumnIndex>>>,
     offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
     row_group_index: usize,
+    // kv_metadatas will be appended to `props` when `write_metadata`
+    kv_metadatas: Vec<KeyValue>,
 }
 
 impl<W: Write> SerializedFileWriter<W> {
@@ -159,6 +161,7 @@ impl<W: Write> SerializedFileWriter<W> {
             column_indexes: Vec::new(),
             offset_indexes: Vec::new(),
             row_group_index: 0,
+            kv_metadatas: Vec::new(),
         })
     }
 
@@ -309,12 +312,18 @@ impl<W: Write> SerializedFileWriter<W> {
         self.write_column_indexes(&mut row_groups)?;
         self.write_offset_indexes(&mut row_groups)?;
 
+        let key_value_metadata = match self.props.key_value_metadata() {
+            Some(kv) => 
Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
+            None if self.kv_metadatas.is_empty() => None,
+            None => Some(self.kv_metadatas.clone()),
+        };
+
         let file_metadata = parquet::FileMetaData {
             num_rows,
             row_groups,
+            key_value_metadata,
             version: self.props.writer_version().as_num(),
             schema: types::to_thrift(self.schema.as_ref())?,
-            key_value_metadata: self.props.key_value_metadata().cloned(),
             created_by: Some(self.props.created_by().to_owned()),
             column_orders: None,
             encryption_algorithm: None,
@@ -347,6 +356,10 @@ impl<W: Write> SerializedFileWriter<W> {
         }
     }
 
+    pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
+        self.kv_metadatas.push(kv_metadata);
+    }
+
     /// Writes the file footer and returns the underlying writer.
     pub fn into_inner(mut self) -> Result<W> {
         self.assert_previous_writer_closed()?;
@@ -1355,4 +1368,69 @@ mod tests {
             })
         });
     }
+
+    fn test_kv_metadata(
+        initial_kv: Option<Vec<KeyValue>>,
+        final_kv: Option<Vec<KeyValue>>,
+    ) {
+        let schema = Arc::new(
+            types::Type::group_type_builder("schema")
+                .with_fields(&mut vec![Arc::new(
+                    types::Type::primitive_type_builder("col1", Type::INT32)
+                        .with_repetition(Repetition::REQUIRED)
+                        .build()
+                        .unwrap(),
+                )])
+                .build()
+                .unwrap(),
+        );
+        let mut out = Vec::with_capacity(1024);
+        let props = Arc::new(
+            WriterProperties::builder()
+                .set_key_value_metadata(initial_kv.clone())
+                .build(),
+        );
+        let mut writer = SerializedFileWriter::new(&mut out, schema, 
props).unwrap();
+        let mut row_group_writer = writer.next_row_group().unwrap();
+        let column = row_group_writer.next_column().unwrap().unwrap();
+        column.close().unwrap();
+        row_group_writer.close().unwrap();
+        if let Some(kvs) = &final_kv {
+            for kv in kvs {
+                writer.append_key_value_metadata(kv.clone())
+            }
+        }
+        writer.close().unwrap();
+
+        let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
+        let metadata = reader.metadata().file_metadata();
+        let keys = metadata.key_value_metadata();
+
+        match (initial_kv, final_kv) {
+            (Some(a), Some(b)) => {
+                let keys = keys.unwrap();
+                assert_eq!(keys.len(), a.len() + b.len());
+                assert_eq!(&keys[..a.len()], a.as_slice());
+                assert_eq!(&keys[a.len()..], b.as_slice());
+            }
+            (Some(v), None) => assert_eq!(keys.unwrap(), &v),
+            (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
+            _ => assert!(keys.is_none()),
+        }
+    }
+
+    #[test]
+    fn test_append_metadata() {
+        let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
+        let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
+
+        test_kv_metadata(None, None);
+        test_kv_metadata(Some(vec![kv1.clone()]), None);
+        test_kv_metadata(None, Some(vec![kv2.clone()]));
+        test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
+        test_kv_metadata(Some(vec![]), Some(vec![kv2]));
+        test_kv_metadata(Some(vec![]), Some(vec![]));
+        test_kv_metadata(Some(vec![kv1]), Some(vec![]));
+        test_kv_metadata(None, Some(vec![]));
+    }
 }

Reply via email to