This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 30c14abd33 Optionally coerce names of maps and lists to match Parquet 
specification (#6828)
30c14abd33 is described below

commit 30c14abd3340735e5e4ab9375628f8d8ba7223b4
Author: Ed Seidl <[email protected]>
AuthorDate: Thu Dec 5 15:05:29 2024 -0800

    Optionally coerce names of maps and lists to match Parquet specification 
(#6828)
    
    * optionally coerce names of maps and lists to match Parquet spec
    
    * less verbose
    
    * add ArrowWriter round trip test
    
    * move documentation to builder
    
    * use create_random_array for map and list arrays
---
 parquet/src/arrow/arrow_writer/mod.rs |  51 ++++++++++++++++
 parquet/src/arrow/schema/mod.rs       | 110 ++++++++++++++++++++++++++++++++--
 parquet/src/bin/parquet-rewrite.rs    |   7 +++
 parquet/src/file/properties.rs        |  28 +++++----
 4 files changed, 179 insertions(+), 17 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index 222d86131e..bb6ebf75ec 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -1088,6 +1088,7 @@ mod tests {
     use arrow::datatypes::ToByteSlice;
     use arrow::datatypes::{DataType, Schema};
     use arrow::error::Result as ArrowResult;
+    use arrow::util::data_gen::create_random_array;
     use arrow::util::pretty::pretty_format_batches;
     use arrow::{array::*, buffer::Buffer};
     use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano, NullBuffer};
@@ -2491,6 +2492,56 @@ mod tests {
         one_column_roundtrip(values, false);
     }
 
+    #[test]
+    fn list_and_map_coerced_names() {
+        // Create map and list with non-Parquet naming
+        let list_field =
+            Field::new_list("my_list", Field::new("item", DataType::Int32, 
false), false);
+        let map_field = Field::new_map(
+            "my_map",
+            "entries",
+            Field::new("keys", DataType::Int32, false),
+            Field::new("values", DataType::Int32, true),
+            false,
+            true,
+        );
+
+        let list_array = create_random_array(&list_field, 100, 0.0, 
0.0).unwrap();
+        let map_array = create_random_array(&map_field, 100, 0.0, 
0.0).unwrap();
+
+        let arrow_schema = Arc::new(Schema::new(vec![list_field, map_field]));
+
+        // Write data to Parquet but coerce names to match spec
+        let props = 
Some(WriterProperties::builder().set_coerce_types(true).build());
+        let file = tempfile::tempfile().unwrap();
+        let mut writer =
+            ArrowWriter::try_new(file.try_clone().unwrap(), 
arrow_schema.clone(), props).unwrap();
+
+        let batch = RecordBatch::try_new(arrow_schema, vec![list_array, 
map_array]).unwrap();
+        writer.write(&batch).unwrap();
+        let file_metadata = writer.close().unwrap();
+
+        // Coerced name of "item" should be "element"
+        assert_eq!(file_metadata.schema[3].name, "element");
+        // Coerced name of "entries" should be "key_value"
+        assert_eq!(file_metadata.schema[5].name, "key_value");
+        // Coerced name of "keys" should be "key"
+        assert_eq!(file_metadata.schema[6].name, "key");
+        // Coerced name of "values" should be "value"
+        assert_eq!(file_metadata.schema[7].name, "value");
+
+        // Double check schema after reading from the file
+        let reader = SerializedFileReader::new(file).unwrap();
+        let file_schema = reader.metadata().file_metadata().schema();
+        let fields = file_schema.get_fields();
+        let list_field = &fields[0].get_fields()[0];
+        assert_eq!(list_field.get_fields()[0].name(), "element");
+        let map_field = &fields[1].get_fields()[0];
+        assert_eq!(map_field.name(), "key_value");
+        assert_eq!(map_field.get_fields()[0].name(), "key");
+        assert_eq!(map_field.get_fields()[1].name(), "value");
+    }
+
     #[test]
     fn fallback_flush_data_page() {
         //tests if the Fallback::flush_data_page clears all buffers correctly
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index ec34840d85..0fbcb4856e 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -303,6 +303,11 @@ pub fn decimal_length_from_precision(precision: u8) -> 
usize {
 
 /// Convert an arrow field to a parquet `Type`
 fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
+    const PARQUET_LIST_ELEMENT_NAME: &str = "element";
+    const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
+    const PARQUET_KEY_FIELD_NAME: &str = "key";
+    const PARQUET_VALUE_FIELD_NAME: &str = "value";
+
     let name = field.name().as_str();
     let repetition = if field.is_nullable() {
         Repetition::OPTIONAL
@@ -527,10 +532,18 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: 
bool) -> Result<Type> {
             .with_id(id)
             .build(),
         DataType::List(f) | DataType::FixedSizeList(f, _) | 
DataType::LargeList(f) => {
+            let field_ref = if coerce_types && f.name() != 
PARQUET_LIST_ELEMENT_NAME {
+                // Ensure proper naming per the Parquet specification
+                let ff = 
f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
+                Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
+            } else {
+                Arc::new(arrow_to_parquet_type(f, coerce_types)?)
+            };
+
             Type::group_type_builder(name)
                 .with_fields(vec![Arc::new(
                     Type::group_type_builder("list")
-                        .with_fields(vec![Arc::new(arrow_to_parquet_type(f, 
coerce_types)?)])
+                        .with_fields(vec![field_ref])
                         .with_repetition(Repetition::REPEATED)
                         .build()?,
                 )])
@@ -559,13 +572,29 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: 
bool) -> Result<Type> {
         }
         DataType::Map(field, _) => {
             if let DataType::Struct(struct_fields) = field.data_type() {
+                // If coercing then set inner struct name to "key_value"
+                let map_struct_name = if coerce_types {
+                    PARQUET_MAP_STRUCT_NAME
+                } else {
+                    field.name()
+                };
+
+                // If coercing then ensure struct fields are named "key" and 
"value"
+                let fix_map_field = |name: &str, fld: &Arc<Field>| -> 
Result<Arc<Type>> {
+                    if coerce_types && fld.name() != name {
+                        let f = fld.as_ref().clone().with_name(name);
+                        Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
+                    } else {
+                        Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
+                    }
+                };
+                let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, 
&struct_fields[0])?;
+                let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, 
&struct_fields[1])?;
+
                 Type::group_type_builder(name)
                     .with_fields(vec![Arc::new(
-                        Type::group_type_builder(field.name())
-                            .with_fields(vec![
-                                
Arc::new(arrow_to_parquet_type(&struct_fields[0], coerce_types)?),
-                                
Arc::new(arrow_to_parquet_type(&struct_fields[1], coerce_types)?),
-                            ])
+                        Type::group_type_builder(map_struct_name)
+                            .with_fields(vec![key_field, val_field])
                             .with_repetition(Repetition::REPEATED)
                             .build()?,
                     )])
@@ -1420,6 +1449,75 @@ mod tests {
         assert_eq!(arrow_fields, converted_arrow_fields);
     }
 
+    #[test]
+    fn test_coerced_map_list() {
+        // Create Arrow schema with non-Parquet naming
+        let arrow_fields = vec![
+            Field::new_list(
+                "my_list",
+                Field::new("item", DataType::Boolean, true),
+                false,
+            ),
+            Field::new_map(
+                "my_map",
+                "entries",
+                Field::new("keys", DataType::Utf8, false),
+                Field::new("values", DataType::Int32, true),
+                false,
+                true,
+            ),
+        ];
+        let arrow_schema = Schema::new(arrow_fields);
+
+        // Create Parquet schema with coerced names
+        let message_type = "
+        message parquet_schema {
+            REQUIRED GROUP my_list (LIST) {
+                REPEATED GROUP list {
+                    OPTIONAL BOOLEAN element;
+                }
+            }
+            OPTIONAL GROUP my_map (MAP) {
+                REPEATED GROUP key_value {
+                    REQUIRED BINARY key (STRING);
+                    OPTIONAL INT32 value;
+                }
+            }
+        }
+        ";
+        let parquet_group_type = parse_message_type(message_type).unwrap();
+        let parquet_schema = 
SchemaDescriptor::new(Arc::new(parquet_group_type));
+        let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, 
true).unwrap();
+        assert_eq!(
+            parquet_schema.columns().len(),
+            converted_arrow_schema.columns().len()
+        );
+
+        // Create Parquet schema without coerced names
+        let message_type = "
+        message parquet_schema {
+            REQUIRED GROUP my_list (LIST) {
+                REPEATED GROUP list {
+                    OPTIONAL BOOLEAN item;
+                }
+            }
+            OPTIONAL GROUP my_map (MAP) {
+                REPEATED GROUP entries {
+                    REQUIRED BINARY keys (STRING);
+                    OPTIONAL INT32 values;
+                }
+            }
+        }
+        ";
+        let parquet_group_type = parse_message_type(message_type).unwrap();
+        let parquet_schema = 
SchemaDescriptor::new(Arc::new(parquet_group_type));
+        let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, 
false).unwrap();
+        assert_eq!(
+            parquet_schema.columns().len(),
+            converted_arrow_schema.columns().len()
+        );
+    }
+
     #[test]
     fn test_field_to_column_desc() {
         let message_type = "
diff --git a/parquet/src/bin/parquet-rewrite.rs 
b/parquet/src/bin/parquet-rewrite.rs
index ad0f7ae0df..eaecda5037 100644
--- a/parquet/src/bin/parquet-rewrite.rs
+++ b/parquet/src/bin/parquet-rewrite.rs
@@ -199,6 +199,10 @@ struct Args {
     /// Sets writer version.
     #[clap(long)]
     writer_version: Option<WriterVersionArgs>,
+
+    /// Sets whether to coerce Arrow types to match Parquet specification
+    #[clap(long)]
+    coerce_types: Option<bool>,
 }
 
 fn main() {
@@ -262,6 +266,9 @@ fn main() {
     if let Some(value) = args.writer_version {
         writer_properties_builder = 
writer_properties_builder.set_writer_version(value.into());
     }
+    if let Some(value) = args.coerce_types {
+        writer_properties_builder = 
writer_properties_builder.set_coerce_types(value);
+    }
     let writer_properties = writer_properties_builder.build();
     let mut parquet_writer = ArrowWriter::try_new(
         File::create(&args.output).expect("Unable to open output file"),
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 1e8a4868df..aac450acd8 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -287,15 +287,7 @@ impl WriterProperties {
         self.statistics_truncate_length
     }
 
-    /// Returns `coerce_types` boolean
-    ///
-    /// Some Arrow types do not have a corresponding Parquet logical type.
-    /// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
-    /// Writers have the option to coerce these into native Parquet types. Type
-    /// coercion allows for meaningful representations that do not require
-    /// downstream readers to consider the embedded Arrow schema. However, type
-    /// coercion also prevents the data from being losslessly round-tripped. 
This method
-    /// returns `true` if type coercion enabled.
+    /// Returns `true` if type coercion is enabled.
     pub fn coerce_types(&self) -> bool {
         self.coerce_types
     }
@@ -788,8 +780,22 @@ impl WriterPropertiesBuilder {
         self
     }
 
-    /// Sets flag to enable/disable type coercion.
-    /// Takes precedence over globally defined settings.
+    /// Sets flag to control if type coercion is enabled (defaults to `false`).
+    ///
+    /// # Notes
+    /// Some Arrow types do not have a corresponding Parquet logical type.
+    /// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
+    /// Also, for [`List`] and [`Map`] types, Parquet expects certain schema 
elements
+    /// to have specific names to be considered fully compliant.
+    /// Writers have the option to coerce these types and names to match those 
required
+    /// by the Parquet specification.
+    /// This type coercion allows for meaningful representations that do not 
require
+    /// downstream readers to consider the embedded Arrow schema, and can 
allow for greater
+    /// compatibility with other Parquet implementations. However, type
+    /// coercion also prevents the data from being losslessly round-tripped.
+    ///
+    /// [`List`]: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
+    /// [`Map`]: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
     pub fn set_coerce_types(mut self, coerce_types: bool) -> Self {
         self.coerce_types = coerce_types;
         self

Reply via email to