This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new f521e11dc feat: add append_key_value_metadata (#3367)
f521e11dc is described below
commit f521e11dc2da0313cb1a16958d39a67a41a25fa6
Author: Jiacai Liu <[email protected]>
AuthorDate: Wed Dec 21 01:01:12 2022 +0800
feat: add append_key_value_metadata (#3367)
* Add update_key_value_metadata
* Add comments
* Address review
* fix clippy
* Update parquet/src/arrow/arrow_writer/mod.rs
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
* Fix reviews
* Test and fix
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
Co-authored-by: Raphael Taylor-Davies <[email protected]>
---
parquet/src/arrow/arrow_writer/mod.rs | 9 +++-
parquet/src/file/writer.rs | 80 ++++++++++++++++++++++++++++++++++-
2 files changed, 87 insertions(+), 2 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 53ca71d28..5cf33d125 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -32,7 +32,7 @@ use super::schema::{
use crate::arrow::arrow_writer::byte_array::ByteArrayWriter;
use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
use crate::errors::{ParquetError, Result};
-use crate::file::metadata::RowGroupMetaDataPtr;
+use crate::file::metadata::{KeyValue, RowGroupMetaDataPtr};
use crate::file::properties::WriterProperties;
use crate::file::writer::SerializedRowGroupWriter;
use crate::{data_type::*, file::writer::SerializedFileWriter};
@@ -158,6 +158,13 @@ impl<W: Write> ArrowWriter<W> {
self.flush_rows(self.buffered_rows)
}
+ /// Additional [`KeyValue`] metadata to be written in addition to those
from [`WriterProperties`]
+ ///
+ /// This method provide a way to append kv_metadata after write RecordBatch
+ pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
+ self.writer.append_key_value_metadata(kv_metadata)
+ }
+
/// Flushes `num_rows` from the buffer into a new row group
fn flush_rows(&mut self, num_rows: usize) -> Result<()> {
if num_rows == 0 {
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index a12d5477c..2d879be80 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -142,6 +142,8 @@ pub struct SerializedFileWriter<W: Write> {
column_indexes: Vec<Vec<Option<ColumnIndex>>>,
offset_indexes: Vec<Vec<Option<OffsetIndex>>>,
row_group_index: usize,
+ // kv_metadatas will be appended to `props` when `write_metadata`
+ kv_metadatas: Vec<KeyValue>,
}
impl<W: Write> SerializedFileWriter<W> {
@@ -159,6 +161,7 @@ impl<W: Write> SerializedFileWriter<W> {
column_indexes: Vec::new(),
offset_indexes: Vec::new(),
row_group_index: 0,
+ kv_metadatas: Vec::new(),
})
}
@@ -309,12 +312,18 @@ impl<W: Write> SerializedFileWriter<W> {
self.write_column_indexes(&mut row_groups)?;
self.write_offset_indexes(&mut row_groups)?;
+ let key_value_metadata = match self.props.key_value_metadata() {
+ Some(kv) =>
Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
+ None if self.kv_metadatas.is_empty() => None,
+ None => Some(self.kv_metadatas.clone()),
+ };
+
let file_metadata = parquet::FileMetaData {
num_rows,
row_groups,
+ key_value_metadata,
version: self.props.writer_version().as_num(),
schema: types::to_thrift(self.schema.as_ref())?,
- key_value_metadata: self.props.key_value_metadata().cloned(),
created_by: Some(self.props.created_by().to_owned()),
column_orders: None,
encryption_algorithm: None,
@@ -347,6 +356,10 @@ impl<W: Write> SerializedFileWriter<W> {
}
}
+ pub fn append_key_value_metadata(&mut self, kv_metadata: KeyValue) {
+ self.kv_metadatas.push(kv_metadata);
+ }
+
/// Writes the file footer and returns the underlying writer.
pub fn into_inner(mut self) -> Result<W> {
self.assert_previous_writer_closed()?;
@@ -1355,4 +1368,69 @@ mod tests {
})
});
}
+
+ fn test_kv_metadata(
+ initial_kv: Option<Vec<KeyValue>>,
+ final_kv: Option<Vec<KeyValue>>,
+ ) {
+ let schema = Arc::new(
+ types::Type::group_type_builder("schema")
+ .with_fields(&mut vec![Arc::new(
+ types::Type::primitive_type_builder("col1", Type::INT32)
+ .with_repetition(Repetition::REQUIRED)
+ .build()
+ .unwrap(),
+ )])
+ .build()
+ .unwrap(),
+ );
+ let mut out = Vec::with_capacity(1024);
+ let props = Arc::new(
+ WriterProperties::builder()
+ .set_key_value_metadata(initial_kv.clone())
+ .build(),
+ );
+ let mut writer = SerializedFileWriter::new(&mut out, schema,
props).unwrap();
+ let mut row_group_writer = writer.next_row_group().unwrap();
+ let column = row_group_writer.next_column().unwrap().unwrap();
+ column.close().unwrap();
+ row_group_writer.close().unwrap();
+ if let Some(kvs) = &final_kv {
+ for kv in kvs {
+ writer.append_key_value_metadata(kv.clone())
+ }
+ }
+ writer.close().unwrap();
+
+ let reader = SerializedFileReader::new(Bytes::from(out)).unwrap();
+ let metadata = reader.metadata().file_metadata();
+ let keys = metadata.key_value_metadata();
+
+ match (initial_kv, final_kv) {
+ (Some(a), Some(b)) => {
+ let keys = keys.unwrap();
+ assert_eq!(keys.len(), a.len() + b.len());
+ assert_eq!(&keys[..a.len()], a.as_slice());
+ assert_eq!(&keys[a.len()..], b.as_slice());
+ }
+ (Some(v), None) => assert_eq!(keys.unwrap(), &v),
+ (None, Some(v)) if !v.is_empty() => assert_eq!(keys.unwrap(), &v),
+ _ => assert!(keys.is_none()),
+ }
+ }
+
+ #[test]
+ fn test_append_metadata() {
+ let kv1 = KeyValue::new("cupcakes".to_string(), "awesome".to_string());
+ let kv2 = KeyValue::new("bingo".to_string(), "bongo".to_string());
+
+ test_kv_metadata(None, None);
+ test_kv_metadata(Some(vec![kv1.clone()]), None);
+ test_kv_metadata(None, Some(vec![kv2.clone()]));
+ test_kv_metadata(Some(vec![kv1.clone()]), Some(vec![kv2.clone()]));
+ test_kv_metadata(Some(vec![]), Some(vec![kv2]));
+ test_kv_metadata(Some(vec![]), Some(vec![]));
+ test_kv_metadata(Some(vec![kv1]), Some(vec![]));
+ test_kv_metadata(None, Some(vec![]));
+ }
}