This is an automated email from the ASF dual-hosted git repository. nevime pushed a commit to branch rust-parquet-arrow-writer in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 0a92daa36ddd0f57da377c6c4863613f50edc48d Author: Neville Dipale <nevilled...@gmail.com> AuthorDate: Tue Aug 18 18:39:37 2020 +0200 ARROW-8423: [Rust] [Parquet] Serialize Arrow schema metadata This will allow preserving Arrow-specific metadata when writing or reading Parquet files created from C++ or Rust. If the schema can't be deserialised, the normal Parquet > Arrow schema conversion is performed. Closes #7917 from nevi-me/ARROW-8243 Authored-by: Neville Dipale <nevilled...@gmail.com> Signed-off-by: Neville Dipale <nevilled...@gmail.com> --- rust/parquet/Cargo.toml | 3 +- rust/parquet/src/arrow/arrow_writer.rs | 27 ++- rust/parquet/src/arrow/mod.rs | 4 + rust/parquet/src/arrow/schema.rs | 306 ++++++++++++++++++++++++++++----- rust/parquet/src/file/properties.rs | 6 +- 5 files changed, 290 insertions(+), 56 deletions(-) diff --git a/rust/parquet/Cargo.toml b/rust/parquet/Cargo.toml index 50d7c34..60e43c9 100644 --- a/rust/parquet/Cargo.toml +++ b/rust/parquet/Cargo.toml @@ -40,6 +40,7 @@ zstd = { version = "0.5", optional = true } chrono = "0.4" num-bigint = "0.3" arrow = { path = "../arrow", version = "2.0.0-SNAPSHOT", optional = true } +base64 = { version = "*", optional = true } [dev-dependencies] rand = "0.7" @@ -52,4 +53,4 @@ arrow = { path = "../arrow", version = "2.0.0-SNAPSHOT" } serde_json = { version = "1.0", features = ["preserve_order"] } [features] -default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd"] +default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs index 0c1c490..1ca8d50 100644 --- a/rust/parquet/src/arrow/arrow_writer.rs +++ b/rust/parquet/src/arrow/arrow_writer.rs @@ -24,6 +24,7 @@ use arrow::datatypes::{DataType as ArrowDataType, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow_array::Array; +use super::schema::add_encoded_arrow_schema_to_metadata; use crate::column::writer::ColumnWriter; use crate::errors::{ParquetError, Result}; use crate::file::properties::WriterProperties; @@ -53,17 +54,17 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> { pub fn try_new( writer: W, arrow_schema: SchemaRef, - props: Option<Rc<WriterProperties>>, + props: Option<WriterProperties>, ) -> Result<Self> { let schema = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?; - let props = match props { - Some(props) => props, - None => Rc::new(WriterProperties::builder().build()), - }; + // add serialized arrow schema + let mut props = props.unwrap_or_else(|| WriterProperties::builder().build()); + add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props); + let file_writer = SerializedFileWriter::new( writer.try_clone()?, schema.root_schema_ptr(), - props, + Rc::new(props), )?; Ok(Self { @@ -495,7 +496,7 @@ mod tests { use arrow::record_batch::{RecordBatch, RecordBatchReader}; use crate::arrow::{ArrowReader, ParquetFileArrowReader}; - use crate::file::reader::SerializedFileReader; + use crate::file::{metadata::KeyValue, reader::SerializedFileReader}; use crate::util::test_common::get_temp_file; #[test] @@ -584,7 +585,7 @@ mod tests { ) .unwrap(); - let mut file = get_temp_file("test_arrow_writer.parquet", &[]); + let mut file = get_temp_file("test_arrow_writer_binary.parquet", &[]); let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema), None) .unwrap(); @@ -674,8 +675,16 @@ mod tests { ) .unwrap(); + let props = WriterProperties::builder() + .set_key_value_metadata(Some(vec![KeyValue { + key: "test_key".to_string(), + value: Some("test_value".to_string()), + }])) + .build(); + let file = get_temp_file("test_arrow_writer_complex.parquet", &[]); - let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap(); + let mut writer = + ArrowWriter::try_new(file, Arc::new(schema), Some(props)).unwrap(); writer.write(&batch).unwrap(); writer.close().unwrap(); } diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs index 8499481..2b012fb 100644 --- a/rust/parquet/src/arrow/mod.rs +++ b/rust/parquet/src/arrow/mod.rs @@ -58,6 +58,10 @@ pub mod schema; pub use self::arrow_reader::ArrowReader; pub use self::arrow_reader::ParquetFileArrowReader; +pub use self::arrow_writer::ArrowWriter; pub use self::schema::{ arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, }; + +/// Schema metadata key used to store serialized Arrow IPC schema +pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema"; diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index aebb9e7..d4cfe1f 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -26,24 +26,33 @@ use std::collections::{HashMap, HashSet}; use std::rc::Rc; +use arrow::datatypes::{DataType, DateUnit, Field, Schema, TimeUnit}; + use crate::basic::{LogicalType, Repetition, Type as PhysicalType}; use crate::errors::{ParquetError::ArrowError, Result}; -use crate::file::metadata::KeyValue; +use crate::file::{metadata::KeyValue, properties::WriterProperties}; use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type, TypePtr}; -use arrow::datatypes::TimeUnit; -use arrow::datatypes::{DataType, DateUnit, Field, Schema}; - -/// Convert parquet schema to arrow schema including optional metadata. +/// Convert Parquet schema to Arrow schema including optional metadata. +/// Attempts to decode any existing Arrow shcema metadata, falling back +/// to converting the Parquet schema column-wise pub fn parquet_to_arrow_schema( parquet_schema: &SchemaDescriptor, - metadata: &Option<Vec<KeyValue>>, + key_value_metadata: &Option<Vec<KeyValue>>, ) -> Result<Schema> { - parquet_to_arrow_schema_by_columns( - parquet_schema, - 0..parquet_schema.columns().len(), - metadata, - ) + let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default(); + let arrow_schema_metadata = metadata + .remove(super::ARROW_SCHEMA_META_KEY) + .map(|encoded| get_arrow_schema_from_metadata(&encoded)); + + match arrow_schema_metadata { + Some(Some(schema)) => Ok(schema), + _ => parquet_to_arrow_schema_by_columns( + parquet_schema, + 0..parquet_schema.columns().len(), + key_value_metadata, + ), + } } /// Convert parquet schema to arrow schema including optional metadata, only preserving some leaf columns. @@ -81,6 +90,80 @@ where .map(|fields| Schema::new_with_metadata(fields, metadata)) } +/// Try to convert Arrow schema metadata into a schema +fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Option<Schema> { + let decoded = base64::decode(encoded_meta); + match decoded { + Ok(bytes) => { + let slice = if bytes[0..4] == [255u8; 4] { + &bytes[8..] + } else { + bytes.as_slice() + }; + let message = arrow::ipc::get_root_as_message(slice); + message + .header_as_schema() + .map(arrow::ipc::convert::fb_to_schema) + } + Err(err) => { + // The C++ implementation returns an error if the schema can't be parsed. + // To prevent this, we explicitly log this, then compute the schema without the metadata + eprintln!( + "Unable to decode the encoded schema stored in {}, {:?}", + super::ARROW_SCHEMA_META_KEY, + err + ); + None + } + } +} + +/// Encodes the Arrow schema into the IPC format, and base64 encodes it +fn encode_arrow_schema(schema: &Schema) -> String { + let mut serialized_schema = arrow::ipc::writer::schema_to_bytes(&schema); + + // manually prepending the length to the schema as arrow uses the legacy IPC format + // TODO: change after addressing ARROW-9777 + let schema_len = serialized_schema.len(); + let mut len_prefix_schema = Vec::with_capacity(schema_len + 8); + len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]); + len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut()); + len_prefix_schema.append(&mut serialized_schema); + + base64::encode(&len_prefix_schema) +} + +/// Mutates writer metadata by storing the encoded Arrow schema. +/// If there is an existing Arrow schema metadata, it is replaced. +pub(crate) fn add_encoded_arrow_schema_to_metadata( + schema: &Schema, + props: &mut WriterProperties, +) { + let encoded = encode_arrow_schema(schema); + + let schema_kv = KeyValue { + key: super::ARROW_SCHEMA_META_KEY.to_string(), + value: Some(encoded), + }; + + let mut meta = props.key_value_metadata.clone().unwrap_or_default(); + // check if ARROW:schema exists, and overwrite it + let schema_meta = meta + .iter() + .enumerate() + .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY); + match schema_meta { + Some((i, _)) => { + meta.remove(i); + meta.push(schema_kv); + } + None => { + meta.push(schema_kv); + } + } + props.key_value_metadata = Some(meta); +} + /// Convert arrow schema to parquet schema pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> { let fields: Result<Vec<TypePtr>> = schema @@ -215,42 +298,48 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> { Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) .with_logical_type(LogicalType::INTERVAL) .with_repetition(repetition) - .with_length(3) + .with_length(12) + .build() + } + DataType::Binary | DataType::LargeBinary => { + Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) + .with_repetition(repetition) .build() } - DataType::Binary => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) - .with_repetition(repetition) - .build(), DataType::FixedSizeBinary(length) => { Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) .with_repetition(repetition) .with_length(*length) .build() } - DataType::Utf8 => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) - .with_logical_type(LogicalType::UTF8) - .with_repetition(repetition) - .build(), - DataType::List(dtype) | DataType::FixedSizeList(dtype, _) => { - Type::group_type_builder(name) - .with_fields(&mut vec![Rc::new( - Type::group_type_builder("list") - .with_fields(&mut vec![Rc::new({ - let list_field = Field::new( - "element", - *dtype.clone(), - field.is_nullable(), - ); - arrow_to_parquet_type(&list_field)? - })]) - .with_repetition(Repetition::REPEATED) - .build()?, - )]) - .with_logical_type(LogicalType::LIST) - .with_repetition(Repetition::REQUIRED) + DataType::Utf8 | DataType::LargeUtf8 => { + Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) + .with_logical_type(LogicalType::UTF8) + .with_repetition(repetition) .build() } + DataType::List(dtype) + | DataType::FixedSizeList(dtype, _) + | DataType::LargeList(dtype) => Type::group_type_builder(name) + .with_fields(&mut vec![Rc::new( + Type::group_type_builder("list") + .with_fields(&mut vec![Rc::new({ + let list_field = + Field::new("element", *dtype.clone(), field.is_nullable()); + arrow_to_parquet_type(&list_field)? + })]) + .with_repetition(Repetition::REPEATED) + .build()?, + )]) + .with_logical_type(LogicalType::LIST) + .with_repetition(Repetition::REQUIRED) + .build(), DataType::Struct(fields) => { + if fields.is_empty() { + return Err(ArrowError( + "Parquet does not support writing empty structs".to_string(), + )); + } // recursively convert children to types/nodes let fields: Result<Vec<TypePtr>> = fields .iter() @@ -267,9 +356,6 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> { let dict_field = Field::new(name, *value.clone(), field.is_nullable()); arrow_to_parquet_type(&dict_field) } - DataType::LargeUtf8 | DataType::LargeBinary | DataType::LargeList(_) => { - Err(ArrowError("Large arrays not supported".to_string())) - } } } /// This struct is used to group methods and data structures used to convert parquet @@ -555,12 +641,16 @@ impl ParquetTypeConverter<'_> { mod tests { use super::*; - use std::collections::HashMap; + use std::{collections::HashMap, convert::TryFrom, sync::Arc}; - use arrow::datatypes::{DataType, DateUnit, Field, TimeUnit}; + use arrow::datatypes::{DataType, DateUnit, Field, IntervalUnit, TimeUnit}; - use crate::file::metadata::KeyValue; - use crate::schema::{parser::parse_message_type, types::SchemaDescriptor}; + use crate::file::{metadata::KeyValue, reader::SerializedFileReader}; + use crate::{ + arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader}, + schema::{parser::parse_message_type, types::SchemaDescriptor}, + util::test_common::get_temp_file, + }; #[test] fn test_flat_primitives() { @@ -1195,6 +1285,17 @@ mod tests { } #[test] + #[should_panic(expected = "Parquet does not support writing empty structs")] + fn test_empty_struct_field() { + let arrow_fields = vec![Field::new("struct", DataType::Struct(vec![]), false)]; + let arrow_schema = Schema::new(arrow_fields); + let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema); + + assert!(converted_arrow_schema.is_err()); + converted_arrow_schema.unwrap(); + } + + #[test] fn test_metadata() { let message_type = " message test_schema { @@ -1216,4 +1317,123 @@ mod tests { assert_eq!(converted_arrow_schema.metadata(), &expected_metadata); } + + #[test] + fn test_arrow_schema_roundtrip() -> Result<()> { + // This tests the roundtrip of an Arrow schema + // Fields that are commented out fail roundtrip tests or are unsupported by the writer + let metadata: HashMap<String, String> = + [("Key".to_string(), "Value".to_string())] + .iter() + .cloned() + .collect(); + + let schema = Schema::new_with_metadata( + vec![ + Field::new("c1", DataType::Utf8, false), + Field::new("c2", DataType::Binary, false), + Field::new("c3", DataType::FixedSizeBinary(3), false), + Field::new("c4", DataType::Boolean, false), + Field::new("c5", DataType::Date32(DateUnit::Day), false), + Field::new("c6", DataType::Date64(DateUnit::Millisecond), false), + Field::new("c7", DataType::Time32(TimeUnit::Second), false), + Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false), + Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false), + Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false), + Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false), + Field::new( + "c16", + DataType::Timestamp( + TimeUnit::Millisecond, + Some(Arc::new("UTC".to_string())), + ), + false, + ), + Field::new( + "c17", + DataType::Timestamp( + TimeUnit::Microsecond, + Some(Arc::new("Africa/Johannesburg".to_string())), + ), + false, + ), + Field::new( + "c18", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false), + Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false), + Field::new("c21", DataType::List(Box::new(DataType::Boolean)), false), + Field::new( + "c22", + DataType::FixedSizeList(Box::new(DataType::Boolean), 5), + false, + ), + Field::new( + "c23", + DataType::List(Box::new(DataType::List(Box::new(DataType::Struct( + vec![ + Field::new("a", DataType::Int16, true), + Field::new("b", DataType::Float64, false), + ], + ))))), + true, + ), + Field::new( + "c24", + DataType::Struct(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::UInt16, false), + ]), + false, + ), + Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true), + Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true), + // Field::new("c27", DataType::Duration(TimeUnit::Second), false), + // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false), + // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false), + // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false), + // Field::new_dict( + // "c31", + // DataType::Dictionary( + // Box::new(DataType::Int32), + // Box::new(DataType::Utf8), + // ), + // true, + // 123, + // true, + // ), + Field::new("c32", DataType::LargeBinary, true), + Field::new("c33", DataType::LargeUtf8, true), + Field::new( + "c34", + DataType::LargeList(Box::new(DataType::LargeList(Box::new( + DataType::Struct(vec![ + Field::new("a", DataType::Int16, true), + Field::new("b", DataType::Float64, true), + ]), + )))), + true, + ), + ], + metadata, + ); + + // write to an empty parquet file so that schema is serialized + let file = get_temp_file("test_arrow_schema_roundtrip.parquet", &[]); + let mut writer = ArrowWriter::try_new( + file.try_clone().unwrap(), + Arc::new(schema.clone()), + None, + )?; + writer.close()?; + + // read file back + let parquet_reader = SerializedFileReader::try_from(file)?; + let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(parquet_reader)); + let read_schema = arrow_reader.get_schema()?; + assert_eq!(schema, read_schema); + Ok(()) + } } diff --git a/rust/parquet/src/file/properties.rs b/rust/parquet/src/file/properties.rs index 188d6ec..b62ce7b 100644 --- a/rust/parquet/src/file/properties.rs +++ b/rust/parquet/src/file/properties.rs @@ -89,8 +89,8 @@ pub type WriterPropertiesPtr = Rc<WriterProperties>; /// Writer properties. /// -/// It is created as an immutable data structure, use [`WriterPropertiesBuilder`] to -/// assemble the properties. +/// All properties except the key-value metadata are immutable, +/// use [`WriterPropertiesBuilder`] to assemble these properties. #[derive(Debug, Clone)] pub struct WriterProperties { data_pagesize_limit: usize, @@ -99,7 +99,7 @@ pub struct WriterProperties { max_row_group_size: usize, writer_version: WriterVersion, created_by: String, - key_value_metadata: Option<Vec<KeyValue>>, + pub(crate) key_value_metadata: Option<Vec<KeyValue>>, default_column_properties: ColumnProperties, column_properties: HashMap<ColumnPath, ColumnProperties>, }