This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 30c14abd33 Optionally coerce names of maps and lists to match Parquet
specification (#6828)
30c14abd33 is described below
commit 30c14abd3340735e5e4ab9375628f8d8ba7223b4
Author: Ed Seidl <[email protected]>
AuthorDate: Thu Dec 5 15:05:29 2024 -0800
Optionally coerce names of maps and lists to match Parquet specification
(#6828)
* optionally coerce names of maps and lists to match Parquet spec
* less verbose
* add ArrowWriter round trip test
* move documentation to builder
* use create_random_array for map and list arrays
---
parquet/src/arrow/arrow_writer/mod.rs | 51 ++++++++++++++++
parquet/src/arrow/schema/mod.rs | 110 ++++++++++++++++++++++++++++++++--
parquet/src/bin/parquet-rewrite.rs | 7 +++
parquet/src/file/properties.rs | 28 +++++----
4 files changed, 179 insertions(+), 17 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 222d86131e..bb6ebf75ec 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -1088,6 +1088,7 @@ mod tests {
use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Schema};
use arrow::error::Result as ArrowResult;
+ use arrow::util::data_gen::create_random_array;
use arrow::util::pretty::pretty_format_batches;
use arrow::{array::*, buffer::Buffer};
use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer};
@@ -2491,6 +2492,56 @@ mod tests {
one_column_roundtrip(values, false);
}
+ #[test]
+ fn list_and_map_coerced_names() {
+ // Create map and list with non-Parquet naming
+ let list_field =
+ Field::new_list("my_list", Field::new("item", DataType::Int32,
false), false);
+ let map_field = Field::new_map(
+ "my_map",
+ "entries",
+ Field::new("keys", DataType::Int32, false),
+ Field::new("values", DataType::Int32, true),
+ false,
+ true,
+ );
+
+ let list_array = create_random_array(&list_field, 100, 0.0,
0.0).unwrap();
+ let map_array = create_random_array(&map_field, 100, 0.0,
0.0).unwrap();
+
+ let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
+
+ // Write data to Parquet but coerce names to match spec
+ let props =
Some(WriterProperties::builder().set_coerce_types(true).build());
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(),
arrow_schema.clone(), props).unwrap();
+
+ let batch = RecordBatch::try_new(arrow_schema, vec![list_array,
map_array]).unwrap();
+ writer.write(&batch).unwrap();
+ let file_metadata = writer.close().unwrap();
+
+ // Coerced name of "item" should be "element"
+ assert_eq!(file_metadata.schema[3].name, "element");
+ // Coerced name of "entries" should be "key_value"
+ assert_eq!(file_metadata.schema[5].name, "key_value");
+ // Coerced name of "keys" should be "key"
+ assert_eq!(file_metadata.schema[6].name, "key");
+ // Coerced name of "values" should be "value"
+ assert_eq!(file_metadata.schema[7].name, "value");
+
+ // Double check schema after reading from the file
+ let reader = SerializedFileReader::new(file).unwrap();
+ let file_schema = reader.metadata().file_metadata().schema();
+ let fields = file_schema.get_fields();
+ let list_field = &fields[0].get_fields()[0];
+ assert_eq!(list_field.get_fields()[0].name(), "element");
+ let map_field = &fields[1].get_fields()[0];
+ assert_eq!(map_field.name(), "key_value");
+ assert_eq!(map_field.get_fields()[0].name(), "key");
+ assert_eq!(map_field.get_fields()[1].name(), "value");
+ }
+
#[test]
fn fallback_flush_data_page() {
//tests if the Fallback::flush_data_page clears all buffers correctly
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index ec34840d85..0fbcb4856e 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -303,6 +303,11 @@ pub fn decimal_length_from_precision(precision: u8) ->
usize {
/// Convert an arrow field to a parquet `Type`
fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
+ const PARQUET_LIST_ELEMENT_NAME: &str = "element";
+ const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
+ const PARQUET_KEY_FIELD_NAME: &str = "key";
+ const PARQUET_VALUE_FIELD_NAME: &str = "value";
+
let name = field.name().as_str();
let repetition = if field.is_nullable() {
Repetition::OPTIONAL
@@ -527,10 +532,18 @@ fn arrow_to_parquet_type(field: &Field, coerce_types:
bool) -> Result<Type> {
.with_id(id)
.build(),
DataType::List(f) | DataType::FixedSizeList(f, _) |
DataType::LargeList(f) => {
+ let field_ref = if coerce_types && f.name() !=
PARQUET_LIST_ELEMENT_NAME {
+ // Ensure proper naming per the Parquet specification
+ let ff =
f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
+ Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
+ } else {
+ Arc::new(arrow_to_parquet_type(f, coerce_types)?)
+ };
+
Type::group_type_builder(name)
.with_fields(vec![Arc::new(
Type::group_type_builder("list")
- .with_fields(vec![Arc::new(arrow_to_parquet_type(f,
coerce_types)?)])
+ .with_fields(vec![field_ref])
.with_repetition(Repetition::REPEATED)
.build()?,
)])
@@ -559,13 +572,29 @@ fn arrow_to_parquet_type(field: &Field, coerce_types:
bool) -> Result<Type> {
}
DataType::Map(field, _) => {
if let DataType::Struct(struct_fields) = field.data_type() {
+ // If coercing then set inner struct name to "key_value"
+ let map_struct_name = if coerce_types {
+ PARQUET_MAP_STRUCT_NAME
+ } else {
+ field.name()
+ };
+
+ // If coercing then ensure struct fields are named "key" and
"value"
+ let fix_map_field = |name: &str, fld: &Arc<Field>| ->
Result<Arc<Type>> {
+ if coerce_types && fld.name() != name {
+ let f = fld.as_ref().clone().with_name(name);
+ Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
+ } else {
+ Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
+ }
+ };
+ let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME,
&struct_fields[0])?;
+ let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME,
&struct_fields[1])?;
+
Type::group_type_builder(name)
.with_fields(vec![Arc::new(
- Type::group_type_builder(field.name())
- .with_fields(vec![
-
Arc::new(arrow_to_parquet_type(&struct_fields[0], coerce_types)?),
-
Arc::new(arrow_to_parquet_type(&struct_fields[1], coerce_types)?),
- ])
+ Type::group_type_builder(map_struct_name)
+ .with_fields(vec![key_field, val_field])
.with_repetition(Repetition::REPEATED)
.build()?,
)])
@@ -1420,6 +1449,75 @@ mod tests {
assert_eq!(arrow_fields, converted_arrow_fields);
}
+ #[test]
+ fn test_coerced_map_list() {
+ // Create Arrow schema with non-Parquet naming
+ let arrow_fields = vec![
+ Field::new_list(
+ "my_list",
+ Field::new("item", DataType::Boolean, true),
+ false,
+ ),
+ Field::new_map(
+ "my_map",
+ "entries",
+ Field::new("keys", DataType::Utf8, false),
+ Field::new("values", DataType::Int32, true),
+ false,
+ true,
+ ),
+ ];
+ let arrow_schema = Schema::new(arrow_fields);
+
+ // Create Parquet schema with coerced names
+ let message_type = "
+ message parquet_schema {
+ REQUIRED GROUP my_list (LIST) {
+ REPEATED GROUP list {
+ OPTIONAL BOOLEAN element;
+ }
+ }
+ OPTIONAL GROUP my_map (MAP) {
+ REPEATED GROUP key_value {
+ REQUIRED BINARY key (STRING);
+ OPTIONAL INT32 value;
+ }
+ }
+ }
+ ";
+ let parquet_group_type = parse_message_type(message_type).unwrap();
+ let parquet_schema =
SchemaDescriptor::new(Arc::new(parquet_group_type));
+ let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema,
true).unwrap();
+ assert_eq!(
+ parquet_schema.columns().len(),
+ converted_arrow_schema.columns().len()
+ );
+
+ // Create Parquet schema without coerced names
+ let message_type = "
+ message parquet_schema {
+ REQUIRED GROUP my_list (LIST) {
+ REPEATED GROUP list {
+ OPTIONAL BOOLEAN item;
+ }
+ }
+ OPTIONAL GROUP my_map (MAP) {
+ REPEATED GROUP entries {
+ REQUIRED BINARY keys (STRING);
+ OPTIONAL INT32 values;
+ }
+ }
+ }
+ ";
+ let parquet_group_type = parse_message_type(message_type).unwrap();
+ let parquet_schema =
SchemaDescriptor::new(Arc::new(parquet_group_type));
+ let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema,
false).unwrap();
+ assert_eq!(
+ parquet_schema.columns().len(),
+ converted_arrow_schema.columns().len()
+ );
+ }
+
#[test]
fn test_field_to_column_desc() {
let message_type = "
diff --git a/parquet/src/bin/parquet-rewrite.rs
b/parquet/src/bin/parquet-rewrite.rs
index ad0f7ae0df..eaecda5037 100644
--- a/parquet/src/bin/parquet-rewrite.rs
+++ b/parquet/src/bin/parquet-rewrite.rs
@@ -199,6 +199,10 @@ struct Args {
/// Sets writer version.
#[clap(long)]
writer_version: Option<WriterVersionArgs>,
+
+ /// Sets whether to coerce Arrow types to match Parquet specification
+ #[clap(long)]
+ coerce_types: Option<bool>,
}
fn main() {
@@ -262,6 +266,9 @@ fn main() {
if let Some(value) = args.writer_version {
writer_properties_builder =
writer_properties_builder.set_writer_version(value.into());
}
+ if let Some(value) = args.coerce_types {
+ writer_properties_builder =
writer_properties_builder.set_coerce_types(value);
+ }
let writer_properties = writer_properties_builder.build();
let mut parquet_writer = ArrowWriter::try_new(
File::create(&args.output).expect("Unable to open output file"),
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 1e8a4868df..aac450acd8 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -287,15 +287,7 @@ impl WriterProperties {
self.statistics_truncate_length
}
- /// Returns `coerce_types` boolean
- ///
- /// Some Arrow types do not have a corresponding Parquet logical type.
- /// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
- /// Writers have the option to coerce these into native Parquet types. Type
- /// coercion allows for meaningful representations that do not require
- /// downstream readers to consider the embedded Arrow schema. However, type
- /// coercion also prevents the data from being losslessly round-tripped.
This method
- /// returns `true` if type coercion enabled.
+ /// Returns `true` if type coercion is enabled.
pub fn coerce_types(&self) -> bool {
self.coerce_types
}
@@ -788,8 +780,22 @@ impl WriterPropertiesBuilder {
self
}
- /// Sets flag to enable/disable type coercion.
- /// Takes precedence over globally defined settings.
+ /// Sets flag to control if type coercion is enabled (defaults to `false`).
+ ///
+ /// # Notes
+ /// Some Arrow types do not have a corresponding Parquet logical type.
+ /// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
+ /// Also, for [`List`] and [`Map`] types, Parquet expects certain schema
elements
+ /// to have specific names to be considered fully compliant.
+ /// Writers have the option to coerce these types and names to match those
required
+ /// by the Parquet specification.
+ /// This type coercion allows for meaningful representations that do not
require
+ /// downstream readers to consider the embedded Arrow schema, and can
allow for greater
+ /// compatibility with other Parquet implementations. However, type
+ /// coercion also prevents the data from being losslessly round-tripped.
+ ///
+ /// [`List`]:
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
+ /// [`Map`]:
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
pub fn set_coerce_types(mut self, coerce_types: bool) -> Self {
self.coerce_types = coerce_types;
self