This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new b810e8f207 Support Field ID in ArrowWriter (#4702) (#4710)
b810e8f207 is described below
commit b810e8f207bbc70294b01acba4be32153c18a6ab
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Aug 17 15:05:42 2023 +0100
Support Field ID in ArrowWriter (#4702) (#4710)
---
parquet/src/arrow/mod.rs | 7 ++
parquet/src/arrow/schema/mod.rs | 200 ++++++++++++++++++++++++++++++----------
2 files changed, 156 insertions(+), 51 deletions(-)
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index aad4925c7c..8cca79b40e 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -130,6 +130,13 @@ pub use self::schema::{
/// Schema metadata key used to store serialized Arrow IPC schema
pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
+/// The value of this metadata key, if present on [`Field::metadata`], will be
used
+/// to populate [`BasicTypeInfo::id`]
+///
+/// [`Field::metadata`]: arrow_schema::Field::metadata
+/// [`BasicTypeInfo::id`]: crate::schema::types::BasicTypeInfo::id
+pub const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id";
+
/// A [`ProjectionMask`] identifies a set of columns within a potentially
nested schema to project
///
/// In particular, a [`ProjectionMask`] can be constructed from a list of leaf
column indices
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index bcfc2f884c..3f1994d108 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -295,14 +295,17 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
} else {
Repetition::REQUIRED
};
+ let id = field_id(field);
// create type from field
match field.data_type() {
DataType::Null => Type::primitive_type_builder(name,
PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Unknown))
.with_repetition(repetition)
+ .with_id(id)
.build(),
DataType::Boolean => Type::primitive_type_builder(name,
PhysicalType::BOOLEAN)
.with_repetition(repetition)
+ .with_id(id)
.build(),
DataType::Int8 => Type::primitive_type_builder(name,
PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Integer {
@@ -310,6 +313,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
is_signed: true,
}))
.with_repetition(repetition)
+ .with_id(id)
.build(),
DataType::Int16 => Type::primitive_type_builder(name,
PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Integer {
@@ -317,12 +321,15 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
is_signed: true,
}))
.with_repetition(repetition)
+ .with_id(id)
.build(),
DataType::Int32 => Type::primitive_type_builder(name,
PhysicalType::INT32)
.with_repetition(repetition)
+ .with_id(id)
.build(),
DataType::Int64 => Type::primitive_type_builder(name,
PhysicalType::INT64)
.with_repetition(repetition)
+ .with_id(id)
.build(),
DataType::UInt8 => Type::primitive_type_builder(name,
PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Integer {
@@ -330,6 +337,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
is_signed: false,
}))
.with_repetition(repetition)
+ .with_id(id)
.build(),
DataType::UInt16 => Type::primitive_type_builder(name,
PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Integer {
@@ -337,6 +345,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
is_signed: false,
}))
.with_repetition(repetition)
+ .with_id(id)
.build(),
DataType::UInt32 => Type::primitive_type_builder(name,
PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Integer {
@@ -344,6 +353,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
is_signed: false,
}))
.with_repetition(repetition)
+ .with_id(id)
.build(),
DataType::UInt64 => Type::primitive_type_builder(name,
PhysicalType::INT64)
.with_logical_type(Some(LogicalType::Integer {
@@ -351,18 +361,22 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
is_signed: false,
}))
.with_repetition(repetition)
+ .with_id(id)
.build(),
DataType::Float16 => Err(arrow_err!("Float16 arrays not supported")),
DataType::Float32 => Type::primitive_type_builder(name,
PhysicalType::FLOAT)
.with_repetition(repetition)
+ .with_id(id)
.build(),
DataType::Float64 => Type::primitive_type_builder(name,
PhysicalType::DOUBLE)
.with_repetition(repetition)
+ .with_id(id)
.build(),
DataType::Timestamp(TimeUnit::Second, _) => {
// Cannot represent seconds in LogicalType
Type::primitive_type_builder(name, PhysicalType::INT64)
.with_repetition(repetition)
+ .with_id(id)
.build()
}
DataType::Timestamp(time_unit, tz) => {
@@ -384,21 +398,25 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
},
}))
.with_repetition(repetition)
+ .with_id(id)
.build()
}
DataType::Date32 => Type::primitive_type_builder(name,
PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Date))
.with_repetition(repetition)
+ .with_id(id)
.build(),
// date64 is cast to date32 (#1666)
DataType::Date64 => Type::primitive_type_builder(name,
PhysicalType::INT32)
.with_logical_type(Some(LogicalType::Date))
.with_repetition(repetition)
+ .with_id(id)
.build(),
DataType::Time32(TimeUnit::Second) => {
// Cannot represent seconds in LogicalType
Type::primitive_type_builder(name, PhysicalType::INT32)
.with_repetition(repetition)
+ .with_id(id)
.build()
}
DataType::Time32(unit) => Type::primitive_type_builder(name,
PhysicalType::INT32)
@@ -410,6 +428,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
},
}))
.with_repetition(repetition)
+ .with_id(id)
.build(),
DataType::Time64(unit) => Type::primitive_type_builder(name,
PhysicalType::INT64)
.with_logical_type(Some(LogicalType::Time {
@@ -421,6 +440,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
},
}))
.with_repetition(repetition)
+ .with_id(id)
.build(),
DataType::Duration(_) => {
Err(arrow_err!("Converting Duration to parquet not supported",))
@@ -429,17 +449,20 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
Type::primitive_type_builder(name,
PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_converted_type(ConvertedType::INTERVAL)
.with_repetition(repetition)
+ .with_id(id)
.with_length(12)
.build()
}
DataType::Binary | DataType::LargeBinary => {
Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
.with_repetition(repetition)
+ .with_id(id)
.build()
}
DataType::FixedSizeBinary(length) => {
Type::primitive_type_builder(name,
PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_repetition(repetition)
+ .with_id(id)
.with_length(*length)
.build()
}
@@ -459,6 +482,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
};
Type::primitive_type_builder(name, physical_type)
.with_repetition(repetition)
+ .with_id(id)
.with_length(length)
.with_logical_type(Some(LogicalType::Decimal {
scale: *scale as i32,
@@ -472,6 +496,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
.with_logical_type(Some(LogicalType::String))
.with_repetition(repetition)
+ .with_id(id)
.build()
}
DataType::List(f) | DataType::FixedSizeList(f, _) |
DataType::LargeList(f) => {
@@ -484,6 +509,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
)])
.with_logical_type(Some(LogicalType::List))
.with_repetition(repetition)
+ .with_id(id)
.build()
}
DataType::Struct(fields) => {
@@ -500,6 +526,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
Type::group_type_builder(name)
.with_fields(fields)
.with_repetition(repetition)
+ .with_id(id)
.build()
}
DataType::Map(field, _) => {
@@ -508,22 +535,15 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
.with_fields(vec![Arc::new(
Type::group_type_builder(field.name())
.with_fields(vec![
- Arc::new(arrow_to_parquet_type(&Field::new(
- struct_fields[0].name(),
- struct_fields[0].data_type().clone(),
- false,
- ))?),
- Arc::new(arrow_to_parquet_type(&Field::new(
- struct_fields[1].name(),
- struct_fields[1].data_type().clone(),
- struct_fields[1].is_nullable(),
- ))?),
+
Arc::new(arrow_to_parquet_type(&struct_fields[0])?),
+
Arc::new(arrow_to_parquet_type(&struct_fields[1])?),
])
.with_repetition(Repetition::REPEATED)
.build()?,
)])
.with_logical_type(Some(LogicalType::Map))
.with_repetition(repetition)
+ .with_id(id)
.build()
} else {
Err(arrow_err!(
@@ -543,6 +563,11 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
}
}
+fn field_id(field: &Field) -> Option<i32> {
+ let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
+ value.parse().ok() // Fail quietly if not a valid integer
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -551,6 +576,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
+ use crate::arrow::PARQUET_FIELD_ID_META_KEY;
use crate::file::metadata::KeyValue;
use crate::{
arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
@@ -1555,17 +1581,18 @@ mod tests {
#[test]
fn test_arrow_schema_roundtrip() -> Result<()> {
- // This tests the roundtrip of an Arrow schema
- // Fields that are commented out fail roundtrip tests or are
unsupported by the writer
- let metadata: HashMap<String, String> =
- [("Key".to_string(), "Value".to_string())]
- .iter()
- .cloned()
- .collect();
+ let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
+ a.iter()
+ .map(|(a, b)| (a.to_string(), b.to_string()))
+ .collect()
+ };
let schema = Schema::new_with_metadata(
vec![
- Field::new("c1", DataType::Utf8, false),
+ Field::new("c1", DataType::Utf8, false).with_metadata(meta(&[
+ ("Key", "Foo"),
+ (PARQUET_FIELD_ID_META_KEY, "2"),
+ ])),
Field::new("c2", DataType::Binary, false),
Field::new("c3", DataType::FixedSizeBinary(3), false),
Field::new("c4", DataType::Boolean, false),
@@ -1598,24 +1625,40 @@ mod tests {
Field::new("c20", DataType::Interval(IntervalUnit::YearMonth),
false),
Field::new_list(
"c21",
- Field::new("list", DataType::Boolean, true),
+ Field::new("item", DataType::Boolean,
true).with_metadata(meta(&[
+ ("Key", "Bar"),
+ (PARQUET_FIELD_ID_META_KEY, "5"),
+ ])),
+ false,
+ )
+ .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
+ Field::new(
+ "c22",
+ DataType::FixedSizeList(
+ Arc::new(Field::new("item", DataType::Boolean, true)),
+ 5,
+ ),
+ false,
+ ),
+ Field::new_list(
+ "c23",
+ Field::new_large_list(
+ "inner",
+ Field::new(
+ "item",
+ DataType::Struct(
+ vec![
+ Field::new("a", DataType::Int16, true),
+ Field::new("b", DataType::Float64, false),
+ ]
+ .into(),
+ ),
+ false,
+ ),
+ true,
+ ),
false,
),
- // Field::new(
- // "c22",
- // DataType::FixedSizeList(Box::new(DataType::Boolean), 5),
- // false,
- // ),
- // Field::new(
- // "c23",
- // DataType::List(Box::new(DataType::LargeList(Box::new(
- // DataType::Struct(vec![
- // Field::new("a", DataType::Int16, true),
- // Field::new("b", DataType::Float64, false),
- // ]),
- // )))),
- // true,
- // ),
Field::new(
"c24",
DataType::Struct(Fields::from(vec![
@@ -1626,6 +1669,7 @@ mod tests {
),
Field::new("c25", DataType::Interval(IntervalUnit::YearMonth),
true),
Field::new("c26", DataType::Interval(IntervalUnit::DayTime),
true),
+ // Duration types not supported
// Field::new("c27", DataType::Duration(TimeUnit::Second),
false),
// Field::new("c28",
DataType::Duration(TimeUnit::Millisecond), false),
// Field::new("c29",
DataType::Duration(TimeUnit::Microsecond), false),
@@ -1639,19 +1683,29 @@ mod tests {
true,
123,
true,
- ),
+ )
+ .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
Field::new("c32", DataType::LargeBinary, true),
Field::new("c33", DataType::LargeUtf8, true),
- // Field::new(
- // "c34",
- // DataType::LargeList(Box::new(DataType::List(Box::new(
- // DataType::Struct(vec![
- // Field::new("a", DataType::Int16, true),
- // Field::new("b", DataType::Float64, true),
- // ]),
- // )))),
- // true,
- // ),
+ Field::new_large_list(
+ "c34",
+ Field::new_list(
+ "inner",
+ Field::new(
+ "item",
+ DataType::Struct(
+ vec![
+ Field::new("a", DataType::Int16, true),
+ Field::new("b", DataType::Float64, true),
+ ]
+ .into(),
+ ),
+ true,
+ ),
+ true,
+ ),
+ true,
+ ),
Field::new("c35", DataType::Null, true),
Field::new("c36", DataType::Decimal128(2, 1), false),
Field::new("c37", DataType::Decimal256(50, 20), false),
@@ -1671,29 +1725,34 @@ mod tests {
Field::new_map(
"c40",
"my_entries",
- Field::new("my_key", DataType::Utf8, false),
+ Field::new("my_key", DataType::Utf8, false)
+ .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY,
"8")])),
Field::new_list(
"my_value",
- Field::new("item", DataType::Utf8, true),
+ Field::new("item", DataType::Utf8, true)
+ .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY,
"10")])),
true,
- ),
+ )
+ .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
false, // fails to roundtrip keys_sorted
true,
- ),
+ )
+ .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
Field::new_map(
"c41",
"my_entries",
Field::new("my_key", DataType::Utf8, false),
Field::new_list(
"my_value",
- Field::new("item", DataType::Utf8, true),
+ Field::new("item", DataType::Utf8, true)
+ .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY,
"11")])),
true,
),
false, // fails to roundtrip keys_sorted
false,
),
],
- metadata,
+ meta(&[("Key", "Value")]),
);
// write to an empty parquet file so that schema is serialized
@@ -1707,9 +1766,48 @@ mod tests {
// read file back
let arrow_reader =
ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+
+ // Check arrow schema
let read_schema = arrow_reader.schema();
assert_eq!(&schema, read_schema.as_ref());
+ // Walk schema finding field IDs
+ let mut stack = Vec::with_capacity(10);
+ let mut out = Vec::with_capacity(10);
+
+ let root = arrow_reader.parquet_schema().root_schema_ptr();
+ stack.push((root.name().to_string(), root));
+
+ while let Some((p, t)) = stack.pop() {
+ if t.is_group() {
+ for f in t.get_fields() {
+ stack.push((format!("{p}.{}", f.name()), f.clone()))
+ }
+ }
+
+ let info = t.get_basic_info();
+ if info.has_id() {
+ out.push(format!("{p} -> {}", info.id()))
+ }
+ }
+ out.sort_unstable();
+ let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
+
+ assert_eq!(
+ &out,
+ &[
+ "arrow_schema.c1 -> 2",
+ "arrow_schema.c21 -> 4",
+ "arrow_schema.c21.list.item -> 5",
+ "arrow_schema.c31 -> 6",
+ "arrow_schema.c40 -> 7",
+ "arrow_schema.c40.my_entries.my_key -> 8",
+ "arrow_schema.c40.my_entries.my_value -> 9",
+ "arrow_schema.c40.my_entries.my_value.list.item -> 10",
+ "arrow_schema.c41.my_entries.my_value.list.item -> 11",
+ ]
+ );
+
Ok(())
}