This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new b810e8f207 Support Field ID in ArrowWriter (#4702) (#4710)
b810e8f207 is described below

commit b810e8f207bbc70294b01acba4be32153c18a6ab
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Aug 17 15:05:42 2023 +0100

    Support Field ID in ArrowWriter (#4702) (#4710)
---
 parquet/src/arrow/mod.rs        |   7 ++
 parquet/src/arrow/schema/mod.rs | 200 ++++++++++++++++++++++++++++++----------
 2 files changed, 156 insertions(+), 51 deletions(-)

diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index aad4925c7c..8cca79b40e 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -130,6 +130,13 @@ pub use self::schema::{
 /// Schema metadata key used to store serialized Arrow IPC schema
 pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
 
+/// The value of this metadata key, if present on [`Field::metadata`], will be 
used
+/// to populate [`BasicTypeInfo::id`]
+///
+/// [`Field::metadata`]: arrow_schema::Field::metadata
+/// [`BasicTypeInfo::id`]: crate::schema::types::BasicTypeInfo::id
+pub const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id";
+
 /// A [`ProjectionMask`] identifies a set of columns within a potentially 
nested schema to project
 ///
 /// In particular, a [`ProjectionMask`] can be constructed from a list of leaf 
column indices
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index bcfc2f884c..3f1994d108 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -295,14 +295,17 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
     } else {
         Repetition::REQUIRED
     };
+    let id = field_id(field);
     // create type from field
     match field.data_type() {
         DataType::Null => Type::primitive_type_builder(name, 
PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Unknown))
             .with_repetition(repetition)
+            .with_id(id)
             .build(),
         DataType::Boolean => Type::primitive_type_builder(name, 
PhysicalType::BOOLEAN)
             .with_repetition(repetition)
+            .with_id(id)
             .build(),
         DataType::Int8 => Type::primitive_type_builder(name, 
PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Integer {
@@ -310,6 +313,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
                 is_signed: true,
             }))
             .with_repetition(repetition)
+            .with_id(id)
             .build(),
         DataType::Int16 => Type::primitive_type_builder(name, 
PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Integer {
@@ -317,12 +321,15 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
                 is_signed: true,
             }))
             .with_repetition(repetition)
+            .with_id(id)
             .build(),
         DataType::Int32 => Type::primitive_type_builder(name, 
PhysicalType::INT32)
             .with_repetition(repetition)
+            .with_id(id)
             .build(),
         DataType::Int64 => Type::primitive_type_builder(name, 
PhysicalType::INT64)
             .with_repetition(repetition)
+            .with_id(id)
             .build(),
         DataType::UInt8 => Type::primitive_type_builder(name, 
PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Integer {
@@ -330,6 +337,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
                 is_signed: false,
             }))
             .with_repetition(repetition)
+            .with_id(id)
             .build(),
         DataType::UInt16 => Type::primitive_type_builder(name, 
PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Integer {
@@ -337,6 +345,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
                 is_signed: false,
             }))
             .with_repetition(repetition)
+            .with_id(id)
             .build(),
         DataType::UInt32 => Type::primitive_type_builder(name, 
PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Integer {
@@ -344,6 +353,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
                 is_signed: false,
             }))
             .with_repetition(repetition)
+            .with_id(id)
             .build(),
         DataType::UInt64 => Type::primitive_type_builder(name, 
PhysicalType::INT64)
             .with_logical_type(Some(LogicalType::Integer {
@@ -351,18 +361,22 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
                 is_signed: false,
             }))
             .with_repetition(repetition)
+            .with_id(id)
             .build(),
         DataType::Float16 => Err(arrow_err!("Float16 arrays not supported")),
         DataType::Float32 => Type::primitive_type_builder(name, 
PhysicalType::FLOAT)
             .with_repetition(repetition)
+            .with_id(id)
             .build(),
         DataType::Float64 => Type::primitive_type_builder(name, 
PhysicalType::DOUBLE)
             .with_repetition(repetition)
+            .with_id(id)
             .build(),
         DataType::Timestamp(TimeUnit::Second, _) => {
             // Cannot represent seconds in LogicalType
             Type::primitive_type_builder(name, PhysicalType::INT64)
                 .with_repetition(repetition)
+                .with_id(id)
                 .build()
         }
         DataType::Timestamp(time_unit, tz) => {
@@ -384,21 +398,25 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
                     },
                 }))
                 .with_repetition(repetition)
+                .with_id(id)
                 .build()
         }
         DataType::Date32 => Type::primitive_type_builder(name, 
PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Date))
             .with_repetition(repetition)
+            .with_id(id)
             .build(),
         // date64 is cast to date32 (#1666)
         DataType::Date64 => Type::primitive_type_builder(name, 
PhysicalType::INT32)
             .with_logical_type(Some(LogicalType::Date))
             .with_repetition(repetition)
+            .with_id(id)
             .build(),
         DataType::Time32(TimeUnit::Second) => {
             // Cannot represent seconds in LogicalType
             Type::primitive_type_builder(name, PhysicalType::INT32)
                 .with_repetition(repetition)
+                .with_id(id)
                 .build()
         }
         DataType::Time32(unit) => Type::primitive_type_builder(name, 
PhysicalType::INT32)
@@ -410,6 +428,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
                 },
             }))
             .with_repetition(repetition)
+            .with_id(id)
             .build(),
         DataType::Time64(unit) => Type::primitive_type_builder(name, 
PhysicalType::INT64)
             .with_logical_type(Some(LogicalType::Time {
@@ -421,6 +440,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
                 },
             }))
             .with_repetition(repetition)
+            .with_id(id)
             .build(),
         DataType::Duration(_) => {
             Err(arrow_err!("Converting Duration to parquet not supported",))
@@ -429,17 +449,20 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
             Type::primitive_type_builder(name, 
PhysicalType::FIXED_LEN_BYTE_ARRAY)
                 .with_converted_type(ConvertedType::INTERVAL)
                 .with_repetition(repetition)
+                .with_id(id)
                 .with_length(12)
                 .build()
         }
         DataType::Binary | DataType::LargeBinary => {
             Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
                 .with_repetition(repetition)
+                .with_id(id)
                 .build()
         }
         DataType::FixedSizeBinary(length) => {
             Type::primitive_type_builder(name, 
PhysicalType::FIXED_LEN_BYTE_ARRAY)
                 .with_repetition(repetition)
+                .with_id(id)
                 .with_length(*length)
                 .build()
         }
@@ -459,6 +482,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
             };
             Type::primitive_type_builder(name, physical_type)
                 .with_repetition(repetition)
+                .with_id(id)
                 .with_length(length)
                 .with_logical_type(Some(LogicalType::Decimal {
                     scale: *scale as i32,
@@ -472,6 +496,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
             Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
                 .with_logical_type(Some(LogicalType::String))
                 .with_repetition(repetition)
+                .with_id(id)
                 .build()
         }
         DataType::List(f) | DataType::FixedSizeList(f, _) | 
DataType::LargeList(f) => {
@@ -484,6 +509,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
                 )])
                 .with_logical_type(Some(LogicalType::List))
                 .with_repetition(repetition)
+                .with_id(id)
                 .build()
         }
         DataType::Struct(fields) => {
@@ -500,6 +526,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
             Type::group_type_builder(name)
                 .with_fields(fields)
                 .with_repetition(repetition)
+                .with_id(id)
                 .build()
         }
         DataType::Map(field, _) => {
@@ -508,22 +535,15 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
                     .with_fields(vec![Arc::new(
                         Type::group_type_builder(field.name())
                             .with_fields(vec![
-                                Arc::new(arrow_to_parquet_type(&Field::new(
-                                    struct_fields[0].name(),
-                                    struct_fields[0].data_type().clone(),
-                                    false,
-                                ))?),
-                                Arc::new(arrow_to_parquet_type(&Field::new(
-                                    struct_fields[1].name(),
-                                    struct_fields[1].data_type().clone(),
-                                    struct_fields[1].is_nullable(),
-                                ))?),
+                                
Arc::new(arrow_to_parquet_type(&struct_fields[0])?),
+                                
Arc::new(arrow_to_parquet_type(&struct_fields[1])?),
                             ])
                             .with_repetition(Repetition::REPEATED)
                             .build()?,
                     )])
                     .with_logical_type(Some(LogicalType::Map))
                     .with_repetition(repetition)
+                    .with_id(id)
                     .build()
             } else {
                 Err(arrow_err!(
@@ -543,6 +563,11 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
     }
 }
 
+fn field_id(field: &Field) -> Option<i32> {
+    let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
+    value.parse().ok() // Fail quietly if not a valid integer
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -551,6 +576,7 @@ mod tests {
 
     use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
 
+    use crate::arrow::PARQUET_FIELD_ID_META_KEY;
     use crate::file::metadata::KeyValue;
     use crate::{
         arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
@@ -1555,17 +1581,18 @@ mod tests {
 
     #[test]
     fn test_arrow_schema_roundtrip() -> Result<()> {
-        // This tests the roundtrip of an Arrow schema
-        // Fields that are commented out fail roundtrip tests or are 
unsupported by the writer
-        let metadata: HashMap<String, String> =
-            [("Key".to_string(), "Value".to_string())]
-                .iter()
-                .cloned()
-                .collect();
+        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
+            a.iter()
+                .map(|(a, b)| (a.to_string(), b.to_string()))
+                .collect()
+        };
 
         let schema = Schema::new_with_metadata(
             vec![
-                Field::new("c1", DataType::Utf8, false),
+                Field::new("c1", DataType::Utf8, false).with_metadata(meta(&[
+                    ("Key", "Foo"),
+                    (PARQUET_FIELD_ID_META_KEY, "2"),
+                ])),
                 Field::new("c2", DataType::Binary, false),
                 Field::new("c3", DataType::FixedSizeBinary(3), false),
                 Field::new("c4", DataType::Boolean, false),
@@ -1598,24 +1625,40 @@ mod tests {
                 Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), 
false),
                 Field::new_list(
                     "c21",
-                    Field::new("list", DataType::Boolean, true),
+                    Field::new("item", DataType::Boolean, 
true).with_metadata(meta(&[
+                        ("Key", "Bar"),
+                        (PARQUET_FIELD_ID_META_KEY, "5"),
+                    ])),
+                    false,
+                )
+                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
+                Field::new(
+                    "c22",
+                    DataType::FixedSizeList(
+                        Arc::new(Field::new("item", DataType::Boolean, true)),
+                        5,
+                    ),
+                    false,
+                ),
+                Field::new_list(
+                    "c23",
+                    Field::new_large_list(
+                        "inner",
+                        Field::new(
+                            "item",
+                            DataType::Struct(
+                                vec![
+                                    Field::new("a", DataType::Int16, true),
+                                    Field::new("b", DataType::Float64, false),
+                                ]
+                                .into(),
+                            ),
+                            false,
+                        ),
+                        true,
+                    ),
                     false,
                 ),
-                // Field::new(
-                //     "c22",
-                //     DataType::FixedSizeList(Box::new(DataType::Boolean), 5),
-                //     false,
-                // ),
-                // Field::new(
-                //     "c23",
-                //     DataType::List(Box::new(DataType::LargeList(Box::new(
-                //         DataType::Struct(vec![
-                //             Field::new("a", DataType::Int16, true),
-                //             Field::new("b", DataType::Float64, false),
-                //         ]),
-                //     )))),
-                //     true,
-                // ),
                 Field::new(
                     "c24",
                     DataType::Struct(Fields::from(vec![
@@ -1626,6 +1669,7 @@ mod tests {
                 ),
                 Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), 
true),
                 Field::new("c26", DataType::Interval(IntervalUnit::DayTime), 
true),
+                // Duration types not supported
                 // Field::new("c27", DataType::Duration(TimeUnit::Second), 
false),
                 // Field::new("c28", 
DataType::Duration(TimeUnit::Millisecond), false),
                 // Field::new("c29", 
DataType::Duration(TimeUnit::Microsecond), false),
@@ -1639,19 +1683,29 @@ mod tests {
                     true,
                     123,
                     true,
-                ),
+                )
+                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
                 Field::new("c32", DataType::LargeBinary, true),
                 Field::new("c33", DataType::LargeUtf8, true),
-                // Field::new(
-                //     "c34",
-                //     DataType::LargeList(Box::new(DataType::List(Box::new(
-                //         DataType::Struct(vec![
-                //             Field::new("a", DataType::Int16, true),
-                //             Field::new("b", DataType::Float64, true),
-                //         ]),
-                //     )))),
-                //     true,
-                // ),
+                Field::new_large_list(
+                    "c34",
+                    Field::new_list(
+                        "inner",
+                        Field::new(
+                            "item",
+                            DataType::Struct(
+                                vec![
+                                    Field::new("a", DataType::Int16, true),
+                                    Field::new("b", DataType::Float64, true),
+                                ]
+                                .into(),
+                            ),
+                            true,
+                        ),
+                        true,
+                    ),
+                    true,
+                ),
                 Field::new("c35", DataType::Null, true),
                 Field::new("c36", DataType::Decimal128(2, 1), false),
                 Field::new("c37", DataType::Decimal256(50, 20), false),
@@ -1671,29 +1725,34 @@ mod tests {
                 Field::new_map(
                     "c40",
                     "my_entries",
-                    Field::new("my_key", DataType::Utf8, false),
+                    Field::new("my_key", DataType::Utf8, false)
+                        .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, 
"8")])),
                     Field::new_list(
                         "my_value",
-                        Field::new("item", DataType::Utf8, true),
+                        Field::new("item", DataType::Utf8, true)
+                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, 
"10")])),
                         true,
-                    ),
+                    )
+                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
                     false, // fails to roundtrip keys_sorted
                     true,
-                ),
+                )
+                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
                 Field::new_map(
                     "c41",
                     "my_entries",
                     Field::new("my_key", DataType::Utf8, false),
                     Field::new_list(
                         "my_value",
-                        Field::new("item", DataType::Utf8, true),
+                        Field::new("item", DataType::Utf8, true)
+                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, 
"11")])),
                         true,
                     ),
                     false, // fails to roundtrip keys_sorted
                     false,
                 ),
             ],
-            metadata,
+            meta(&[("Key", "Value")]),
         );
 
         // write to an empty parquet file so that schema is serialized
@@ -1707,9 +1766,48 @@ mod tests {
 
         // read file back
         let arrow_reader = 
ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+
+        // Check arrow schema
         let read_schema = arrow_reader.schema();
         assert_eq!(&schema, read_schema.as_ref());
 
+        // Walk schema finding field IDs
+        let mut stack = Vec::with_capacity(10);
+        let mut out = Vec::with_capacity(10);
+
+        let root = arrow_reader.parquet_schema().root_schema_ptr();
+        stack.push((root.name().to_string(), root));
+
+        while let Some((p, t)) = stack.pop() {
+            if t.is_group() {
+                for f in t.get_fields() {
+                    stack.push((format!("{p}.{}", f.name()), f.clone()))
+                }
+            }
+
+            let info = t.get_basic_info();
+            if info.has_id() {
+                out.push(format!("{p} -> {}", info.id()))
+            }
+        }
+        out.sort_unstable();
+        let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
+
+        assert_eq!(
+            &out,
+            &[
+                "arrow_schema.c1 -> 2",
+                "arrow_schema.c21 -> 4",
+                "arrow_schema.c21.list.item -> 5",
+                "arrow_schema.c31 -> 6",
+                "arrow_schema.c40 -> 7",
+                "arrow_schema.c40.my_entries.my_key -> 8",
+                "arrow_schema.c40.my_entries.my_value -> 9",
+                "arrow_schema.c40.my_entries.my_value.list.item -> 10",
+                "arrow_schema.c41.my_entries.my_value.list.item -> 11",
+            ]
+        );
+
         Ok(())
     }
 

Reply via email to