This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 5761fd2 feat: Implement the conversion from Iceberg Schema to Arrow
Schema (#277)
5761fd2 is described below
commit 5761fd206fcef1ea47f6b03a8eeb598d1f4d612b
Author: ZENOTME <[email protected]>
AuthorDate: Fri Apr 19 21:56:16 2024 +0800
feat: Implement the conversion from Iceberg Schema to Arrow Schema (#277)
* support iceberg schema to arrow schema
* avoid realloc hashmap
---------
Co-authored-by: ZENOTME <[email protected]>
---
crates/iceberg/src/arrow/schema.rs | 870 +++++++++++++++++++++++++++++++------
1 file changed, 734 insertions(+), 136 deletions(-)
diff --git a/crates/iceberg/src/arrow/schema.rs
b/crates/iceberg/src/arrow/schema.rs
index 7e01b20..c7e8700 100644
--- a/crates/iceberg/src/arrow/schema.rs
+++ b/crates/iceberg/src/arrow/schema.rs
@@ -19,10 +19,15 @@
use crate::error::Result;
use crate::spec::{
- ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema,
StructType, Type,
+ ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema,
SchemaVisitor,
+ StructType, Type,
};
use crate::{Error, ErrorKind};
+use arrow_array::types::{validate_decimal_precision_and_scale, Decimal128Type};
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
+use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+use rust_decimal::prelude::ToPrimitive;
+use std::collections::HashMap;
use std::sync::Arc;
/// A post order arrow schema visitor.
@@ -198,11 +203,10 @@ pub fn arrow_schema_to_schema(schema: &ArrowSchema) ->
Result<Schema> {
visit_schema(schema, &mut visitor)
}
-const ARROW_FIELD_ID_KEY: &str = "PARQUET:field_id";
const ARROW_FIELD_DOC_KEY: &str = "doc";
fn get_field_id(field: &Field) -> Result<i32> {
- if let Some(value) = field.metadata().get(ARROW_FIELD_ID_KEY) {
+ if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
return value.parse::<i32>().map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
@@ -385,9 +389,230 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter {
}
}
+struct ToArrowSchemaConverter;
+
+enum ArrowSchemaOrFieldOrType {
+ Schema(ArrowSchema),
+ Field(Field),
+ Type(DataType),
+}
+
+impl SchemaVisitor for ToArrowSchemaConverter {
+ type T = ArrowSchemaOrFieldOrType;
+
+ fn schema(
+ &mut self,
+ _schema: &crate::spec::Schema,
+ value: ArrowSchemaOrFieldOrType,
+ ) -> crate::Result<ArrowSchemaOrFieldOrType> {
+ let struct_type = match value {
+ ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)) => fields,
+ _ => unreachable!(),
+ };
+ Ok(ArrowSchemaOrFieldOrType::Schema(ArrowSchema::new(
+ struct_type,
+ )))
+ }
+
+ fn field(
+ &mut self,
+ field: &crate::spec::NestedFieldRef,
+ value: ArrowSchemaOrFieldOrType,
+ ) -> crate::Result<ArrowSchemaOrFieldOrType> {
+ let ty = match value {
+ ArrowSchemaOrFieldOrType::Type(ty) => ty,
+ _ => unreachable!(),
+ };
+ let metadata = if let Some(doc) = &field.doc {
+ HashMap::from([
+ (PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string()),
+ (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
+ ])
+ } else {
+ HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(),
field.id.to_string())])
+ };
+ Ok(ArrowSchemaOrFieldOrType::Field(
+ Field::new(field.name.clone(), ty,
!field.required).with_metadata(metadata),
+ ))
+ }
+
+ fn r#struct(
+ &mut self,
+ _: &crate::spec::StructType,
+ results: Vec<ArrowSchemaOrFieldOrType>,
+ ) -> crate::Result<ArrowSchemaOrFieldOrType> {
+ let fields = results
+ .into_iter()
+ .map(|result| match result {
+ ArrowSchemaOrFieldOrType::Field(field) => field,
+ _ => unreachable!(),
+ })
+ .collect();
+ Ok(ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)))
+ }
+
+ fn list(
+ &mut self,
+ list: &crate::spec::ListType,
+ value: ArrowSchemaOrFieldOrType,
+ ) -> crate::Result<Self::T> {
+ let field = match self.field(&list.element_field, value)? {
+ ArrowSchemaOrFieldOrType::Field(field) => field,
+ _ => unreachable!(),
+ };
+ let meta = if let Some(doc) = &list.element_field.doc {
+ HashMap::from([
+ (
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ list.element_field.id.to_string(),
+ ),
+ (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()),
+ ])
+ } else {
+ HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ list.element_field.id.to_string(),
+ )])
+ };
+ let field = field.with_metadata(meta);
+ Ok(ArrowSchemaOrFieldOrType::Type(DataType::List(Arc::new(
+ field,
+ ))))
+ }
+
+ fn map(
+ &mut self,
+ map: &crate::spec::MapType,
+ key_value: ArrowSchemaOrFieldOrType,
+ value: ArrowSchemaOrFieldOrType,
+ ) -> crate::Result<ArrowSchemaOrFieldOrType> {
+ let key_field = match self.field(&map.key_field, key_value)? {
+ ArrowSchemaOrFieldOrType::Field(field) => field,
+ _ => unreachable!(),
+ };
+ let value_field = match self.field(&map.value_field, value)? {
+ ArrowSchemaOrFieldOrType::Field(field) => field,
+ _ => unreachable!(),
+ };
+ let field = Field::new(
+ "entries",
+ DataType::Struct(vec![key_field, value_field].into()),
+ map.value_field.required,
+ );
+
+ Ok(ArrowSchemaOrFieldOrType::Type(DataType::Map(
+ field.into(),
+ false,
+ )))
+ }
+
+ fn primitive(
+ &mut self,
+ p: &crate::spec::PrimitiveType,
+ ) -> crate::Result<ArrowSchemaOrFieldOrType> {
+ match p {
+ crate::spec::PrimitiveType::Boolean => {
+ Ok(ArrowSchemaOrFieldOrType::Type(DataType::Boolean))
+ }
+ crate::spec::PrimitiveType::Int =>
Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int32)),
+ crate::spec::PrimitiveType::Long =>
Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int64)),
+ crate::spec::PrimitiveType::Float => {
+ Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float32))
+ }
+ crate::spec::PrimitiveType::Double => {
+ Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float64))
+ }
+ crate::spec::PrimitiveType::Decimal { precision, scale } => {
+ let (precision, scale) = {
+ let precision: u8 =
precision.to_owned().try_into().map_err(|err| {
+ Error::new(
+ crate::ErrorKind::DataInvalid,
+ "incompatible precision for decimal type convert",
+ )
+ .with_source(err)
+ })?;
+ let scale = scale.to_owned().try_into().map_err(|err| {
+ Error::new(
+ crate::ErrorKind::DataInvalid,
+ "incompatible scale for decimal type convert",
+ )
+ .with_source(err)
+ })?;
+ (precision, scale)
+ };
+
validate_decimal_precision_and_scale::<Decimal128Type>(precision,
scale).map_err(
+ |err| {
+ Error::new(
+ crate::ErrorKind::DataInvalid,
+ "incompatible precision and scale for decimal type
convert",
+ )
+ .with_source(err)
+ },
+ )?;
+ Ok(ArrowSchemaOrFieldOrType::Type(DataType::Decimal128(
+ precision, scale,
+ )))
+ }
+ crate::spec::PrimitiveType::Date => {
+ Ok(ArrowSchemaOrFieldOrType::Type(DataType::Date32))
+ }
+ crate::spec::PrimitiveType::Time =>
Ok(ArrowSchemaOrFieldOrType::Type(
+ DataType::Time32(TimeUnit::Microsecond),
+ )),
+ crate::spec::PrimitiveType::Timestamp =>
Ok(ArrowSchemaOrFieldOrType::Type(
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ )),
+ crate::spec::PrimitiveType::Timestamptz =>
Ok(ArrowSchemaOrFieldOrType::Type(
+ // Timestampz always stored as UTC
+ DataType::Timestamp(TimeUnit::Microsecond,
Some("+00:00".into())),
+ )),
+ crate::spec::PrimitiveType::String => {
+ Ok(ArrowSchemaOrFieldOrType::Type(DataType::Utf8))
+ }
+ crate::spec::PrimitiveType::Uuid =>
Ok(ArrowSchemaOrFieldOrType::Type(
+ DataType::FixedSizeBinary(16),
+ )),
+ crate::spec::PrimitiveType::Fixed(len) =>
Ok(ArrowSchemaOrFieldOrType::Type(
+ len.to_i32()
+ .map(DataType::FixedSizeBinary)
+ .unwrap_or(DataType::LargeBinary),
+ )),
+ crate::spec::PrimitiveType::Binary => {
+ Ok(ArrowSchemaOrFieldOrType::Type(DataType::LargeBinary))
+ }
+ }
+ }
+}
+
+/// Convert iceberg schema to an arrow schema.
+pub fn schema_to_arrow_schema(schema: &crate::spec::Schema) ->
crate::Result<ArrowSchema> {
+ let mut converter = ToArrowSchemaConverter;
+ match crate::spec::visit_schema(schema, &mut converter)? {
+ ArrowSchemaOrFieldOrType::Schema(schema) => Ok(schema),
+ _ => unreachable!(),
+ }
+}
+
+impl TryFrom<&ArrowSchema> for crate::spec::Schema {
+ type Error = Error;
+
+ fn try_from(schema: &ArrowSchema) -> crate::Result<Self> {
+ arrow_schema_to_schema(schema)
+ }
+}
+
+impl TryFrom<&crate::spec::Schema> for ArrowSchema {
+ type Error = Error;
+
+ fn try_from(schema: &crate::spec::Schema) -> crate::Result<Self> {
+ schema_to_arrow_schema(schema)
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
+ use crate::spec::Schema;
use arrow_schema::DataType;
use arrow_schema::Field;
use arrow_schema::Schema as ArrowSchema;
@@ -395,15 +620,14 @@ mod tests {
use std::collections::HashMap;
use std::sync::Arc;
- #[test]
- fn test_arrow_schema_to_schema() {
+ fn arrow_schema_for_arrow_schema_to_schema_test() -> ArrowSchema {
let fields = Fields::from(vec![
Field::new("key", DataType::Int32,
false).with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
+ PARQUET_FIELD_ID_META_KEY.to_string(),
"17".to_string(),
)])),
Field::new("value", DataType::Utf8,
true).with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
+ PARQUET_FIELD_ID_META_KEY.to_string(),
"18".to_string(),
)])),
]);
@@ -412,7 +636,7 @@ mod tests {
let map = DataType::Map(
Arc::new(
Field::new("entries", r#struct,
false).with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
+ PARQUET_FIELD_ID_META_KEY.to_string(),
"19".to_string(),
)])),
),
@@ -421,11 +645,11 @@ mod tests {
let fields = Fields::from(vec![
Field::new("aa", DataType::Int32,
false).with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
+ PARQUET_FIELD_ID_META_KEY.to_string(),
"18".to_string(),
)])),
Field::new("bb", DataType::Utf8,
true).with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
+ PARQUET_FIELD_ID_META_KEY.to_string(),
"19".to_string(),
)])),
Field::new(
@@ -434,141 +658,145 @@ mod tests {
false,
)
.with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
+ PARQUET_FIELD_ID_META_KEY.to_string(),
"20".to_string(),
)])),
]);
let r#struct = DataType::Struct(fields);
- let schema =
- ArrowSchema::new(vec![
- Field::new("a", DataType::Int32,
false).with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "2".to_string(),
- )])),
- Field::new("b", DataType::Int64,
false).with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "1".to_string(),
- )])),
- Field::new("c", DataType::Utf8,
false).with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "3".to_string(),
- )])),
- Field::new("n", DataType::LargeUtf8,
false).with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "21".to_string(),
- )])),
- Field::new("d", DataType::Timestamp(TimeUnit::Microsecond,
None), true)
- .with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "4".to_string(),
+ ArrowSchema::new(vec![
+ Field::new("a", DataType::Int32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "2".to_string(),
+ )])),
+ Field::new("b", DataType::Int64,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ Field::new("c", DataType::Utf8,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "3".to_string(),
+ )])),
+ Field::new("n", DataType::LargeUtf8,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "21".to_string(),
+ )])),
+ Field::new("d", DataType::Timestamp(TimeUnit::Microsecond, None),
true).with_metadata(
+ HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(),
"4".to_string())]),
+ ),
+ Field::new("e", DataType::Boolean,
true).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "6".to_string(),
+ )])),
+ Field::new("f", DataType::Float32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "5".to_string(),
+ )])),
+ Field::new("g", DataType::Float64,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "7".to_string(),
+ )])),
+ Field::new("p", DataType::Decimal128(10, 2),
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "27".to_string(),
+ )])),
+ Field::new("h", DataType::Date32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "8".to_string(),
+ )])),
+ Field::new("i", DataType::Time64(TimeUnit::Microsecond),
false).with_metadata(
+ HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(),
"9".to_string())]),
+ ),
+ Field::new(
+ "j",
+ DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
+ false,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "10".to_string(),
+ )])),
+ Field::new(
+ "k",
+ DataType::Timestamp(TimeUnit::Microsecond,
Some("+00:00".into())),
+ false,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "12".to_string(),
+ )])),
+ Field::new("l", DataType::Binary,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "13".to_string(),
+ )])),
+ Field::new("o", DataType::LargeBinary,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "22".to_string(),
+ )])),
+ Field::new("m", DataType::FixedSizeBinary(10),
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "11".to_string(),
+ )])),
+ Field::new(
+ "list",
+ DataType::List(Arc::new(
+ Field::new("element", DataType::Int32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "15".to_string(),
)])),
- Field::new("e", DataType::Boolean,
true).with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "6".to_string(),
- )])),
- Field::new("f", DataType::Float32,
false).with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "5".to_string(),
- )])),
- Field::new("g", DataType::Float64,
false).with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "7".to_string(),
- )])),
- Field::new("p", DataType::Decimal128(10, 2),
false).with_metadata(HashMap::from([
- (ARROW_FIELD_ID_KEY.to_string(), "27".to_string()),
- ])),
- Field::new("h", DataType::Date32,
false).with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "8".to_string(),
- )])),
- Field::new("i", DataType::Time64(TimeUnit::Microsecond),
false).with_metadata(
- HashMap::from([(ARROW_FIELD_ID_KEY.to_string(),
"9".to_string())]),
- ),
- Field::new(
- "j",
- DataType::Timestamp(TimeUnit::Microsecond,
Some("UTC".into())),
- false,
- )
- .with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "10".to_string(),
- )])),
- Field::new(
- "k",
- DataType::Timestamp(TimeUnit::Microsecond,
Some("+00:00".into())),
- false,
- )
- .with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "12".to_string(),
- )])),
- Field::new("l", DataType::Binary,
false).with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "13".to_string(),
- )])),
- Field::new("o", DataType::LargeBinary,
false).with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "22".to_string(),
- )])),
- Field::new("m", DataType::FixedSizeBinary(10),
false).with_metadata(HashMap::from(
- [(ARROW_FIELD_ID_KEY.to_string(), "11".to_string())],
)),
- Field::new(
- "list",
- DataType::List(Arc::new(
- Field::new("element", DataType::Int32,
false).with_metadata(HashMap::from(
- [(ARROW_FIELD_ID_KEY.to_string(),
"15".to_string())],
- )),
- )),
- true,
- )
- .with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "14".to_string(),
- )])),
- Field::new(
- "large_list",
- DataType::LargeList(Arc::new(
- Field::new("element", DataType::Utf8,
false).with_metadata(HashMap::from(
- [(ARROW_FIELD_ID_KEY.to_string(),
"23".to_string())],
- )),
- )),
- true,
- )
- .with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "24".to_string(),
- )])),
- Field::new(
- "fixed_list",
- DataType::FixedSizeList(
- Arc::new(
- Field::new("element", DataType::Binary,
false).with_metadata(
-
HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "26".to_string())]),
- ),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "14".to_string(),
+ )])),
+ Field::new(
+ "large_list",
+ DataType::LargeList(Arc::new(
+ Field::new("element", DataType::Utf8,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "23".to_string(),
+ )])),
+ )),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "24".to_string(),
+ )])),
+ Field::new(
+ "fixed_list",
+ DataType::FixedSizeList(
+ Arc::new(
+ Field::new("element", DataType::Binary,
false).with_metadata(
+ HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "26".to_string(),
+ )]),
),
- 10,
),
- true,
- )
- .with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "25".to_string(),
- )])),
- Field::new("map", map, false).with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "16".to_string(),
- )])),
- Field::new("struct", r#struct,
false).with_metadata(HashMap::from([(
- ARROW_FIELD_ID_KEY.to_string(),
- "17".to_string(),
- )])),
- ]);
- let schema = Arc::new(schema);
- let result = arrow_schema_to_schema(&schema).unwrap();
+ 10,
+ ),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "25".to_string(),
+ )])),
+ Field::new("map", map, false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "16".to_string(),
+ )])),
+ Field::new("struct", r#struct,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "17".to_string(),
+ )])),
+ ])
+ }
+ fn iceberg_schema_for_arrow_schema_to_schema_test() -> Schema {
let schema_json = r#"{
"type":"struct",
"schema-id":0,
@@ -747,7 +975,377 @@ mod tests {
"identifier-field-ids":[]
}"#;
- let expected_type: Schema = serde_json::from_str(schema_json).unwrap();
- assert_eq!(result, expected_type);
+ let schema: Schema = serde_json::from_str(schema_json).unwrap();
+ schema
+ }
+
+ #[test]
+ fn test_arrow_schema_to_schema() {
+ let arrow_schema = arrow_schema_for_arrow_schema_to_schema_test();
+ let schema = iceberg_schema_for_arrow_schema_to_schema_test();
+ let converted_schema = arrow_schema_to_schema(&arrow_schema).unwrap();
+ assert_eq!(converted_schema, schema);
+ }
+
+ fn arrow_schema_for_schema_to_arrow_schema_test() -> ArrowSchema {
+ let fields = Fields::from(vec![
+ Field::new("key", DataType::Int32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "17".to_string(),
+ )])),
+ Field::new("value", DataType::Utf8,
true).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "18".to_string(),
+ )])),
+ ]);
+
+ let r#struct = DataType::Struct(fields);
+ let map = DataType::Map(Arc::new(Field::new("entries", r#struct,
false)), false);
+
+ let fields = Fields::from(vec![
+ Field::new("aa", DataType::Int32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "18".to_string(),
+ )])),
+ Field::new("bb", DataType::Utf8,
true).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "19".to_string(),
+ )])),
+ Field::new(
+ "cc",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ false,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "20".to_string(),
+ )])),
+ ]);
+
+ let r#struct = DataType::Struct(fields);
+
+ ArrowSchema::new(vec![
+ Field::new("a", DataType::Int32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "2".to_string(),
+ )])),
+ Field::new("b", DataType::Int64,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ Field::new("c", DataType::Utf8,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "3".to_string(),
+ )])),
+ Field::new("n", DataType::Utf8,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "21".to_string(),
+ )])),
+ Field::new("d", DataType::Timestamp(TimeUnit::Microsecond, None),
true).with_metadata(
+ HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(),
"4".to_string())]),
+ ),
+ Field::new("e", DataType::Boolean,
true).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "6".to_string(),
+ )])),
+ Field::new("f", DataType::Float32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "5".to_string(),
+ )])),
+ Field::new("g", DataType::Float64,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "7".to_string(),
+ )])),
+ Field::new("p", DataType::Decimal128(10, 2),
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "27".to_string(),
+ )])),
+ Field::new("h", DataType::Date32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "8".to_string(),
+ )])),
+ Field::new("i", DataType::Time32(TimeUnit::Microsecond),
false).with_metadata(
+ HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(),
"9".to_string())]),
+ ),
+ Field::new(
+ "j",
+ DataType::Timestamp(TimeUnit::Microsecond,
Some("+00:00".into())),
+ false,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "10".to_string(),
+ )])),
+ Field::new(
+ "k",
+ DataType::Timestamp(TimeUnit::Microsecond,
Some("+00:00".into())),
+ false,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "12".to_string(),
+ )])),
+ Field::new("l", DataType::LargeBinary,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "13".to_string(),
+ )])),
+ Field::new("o", DataType::LargeBinary,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "22".to_string(),
+ )])),
+ Field::new("m", DataType::FixedSizeBinary(10),
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "11".to_string(),
+ )])),
+ Field::new(
+ "list",
+ DataType::List(Arc::new(
+ Field::new("element", DataType::Int32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "15".to_string(),
+ )])),
+ )),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "14".to_string(),
+ )])),
+ Field::new(
+ "large_list",
+ DataType::List(Arc::new(
+ Field::new("element", DataType::Utf8,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "23".to_string(),
+ )])),
+ )),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "24".to_string(),
+ )])),
+ Field::new(
+ "fixed_list",
+ DataType::List(Arc::new(
+ Field::new("element", DataType::LargeBinary,
false).with_metadata(
+ HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(),
"26".to_string())]),
+ ),
+ )),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "25".to_string(),
+ )])),
+ Field::new("map", map, false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "16".to_string(),
+ )])),
+ Field::new("struct", r#struct,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "17".to_string(),
+ )])),
+ Field::new("uuid", DataType::FixedSizeBinary(16),
false).with_metadata(HashMap::from(
+ [(PARQUET_FIELD_ID_META_KEY.to_string(), "26".to_string())],
+ )),
+ ])
+ }
+
+ fn iceberg_schema_for_schema_to_arrow_schema() -> Schema {
+ let schema_json = r#"{
+ "type":"struct",
+ "schema-id":0,
+ "fields":[
+ {
+ "id":2,
+ "name":"a",
+ "required":true,
+ "type":"int"
+ },
+ {
+ "id":1,
+ "name":"b",
+ "required":true,
+ "type":"long"
+ },
+ {
+ "id":3,
+ "name":"c",
+ "required":true,
+ "type":"string"
+ },
+ {
+ "id":21,
+ "name":"n",
+ "required":true,
+ "type":"string"
+ },
+ {
+ "id":4,
+ "name":"d",
+ "required":false,
+ "type":"timestamp"
+ },
+ {
+ "id":6,
+ "name":"e",
+ "required":false,
+ "type":"boolean"
+ },
+ {
+ "id":5,
+ "name":"f",
+ "required":true,
+ "type":"float"
+ },
+ {
+ "id":7,
+ "name":"g",
+ "required":true,
+ "type":"double"
+ },
+ {
+ "id":27,
+ "name":"p",
+ "required":true,
+ "type":"decimal(10,2)"
+ },
+ {
+ "id":8,
+ "name":"h",
+ "required":true,
+ "type":"date"
+ },
+ {
+ "id":9,
+ "name":"i",
+ "required":true,
+ "type":"time"
+ },
+ {
+ "id":10,
+ "name":"j",
+ "required":true,
+ "type":"timestamptz"
+ },
+ {
+ "id":12,
+ "name":"k",
+ "required":true,
+ "type":"timestamptz"
+ },
+ {
+ "id":13,
+ "name":"l",
+ "required":true,
+ "type":"binary"
+ },
+ {
+ "id":22,
+ "name":"o",
+ "required":true,
+ "type":"binary"
+ },
+ {
+ "id":11,
+ "name":"m",
+ "required":true,
+ "type":"fixed[10]"
+ },
+ {
+ "id":14,
+ "name":"list",
+ "required": false,
+ "type": {
+ "type": "list",
+ "element-id": 15,
+ "element-required": true,
+ "element": "int"
+ }
+ },
+ {
+ "id":24,
+ "name":"large_list",
+ "required": false,
+ "type": {
+ "type": "list",
+ "element-id": 23,
+ "element-required": true,
+ "element": "string"
+ }
+ },
+ {
+ "id":25,
+ "name":"fixed_list",
+ "required": false,
+ "type": {
+ "type": "list",
+ "element-id": 26,
+ "element-required": true,
+ "element": "binary"
+ }
+ },
+ {
+ "id":16,
+ "name":"map",
+ "required": true,
+ "type": {
+ "type": "map",
+ "key-id": 17,
+ "key": "int",
+ "value-id": 18,
+ "value-required": false,
+ "value": "string"
+ }
+ },
+ {
+ "id":17,
+ "name":"struct",
+ "required": true,
+ "type": {
+ "type": "struct",
+ "fields": [
+ {
+ "id":18,
+ "name":"aa",
+ "required":true,
+ "type":"int"
+ },
+ {
+ "id":19,
+ "name":"bb",
+ "required":false,
+ "type":"string"
+ },
+ {
+ "id":20,
+ "name":"cc",
+ "required":true,
+ "type":"timestamp"
+ }
+ ]
+ }
+ },
+ {
+ "id":26,
+ "name":"uuid",
+ "required":true,
+ "type":"uuid"
+ }
+ ],
+ "identifier-field-ids":[]
+ }"#;
+
+ let schema: Schema = serde_json::from_str(schema_json).unwrap();
+ schema
+ }
+
+ #[test]
+ fn test_schema_to_arrow_schema() {
+ let arrow_schema = arrow_schema_for_schema_to_arrow_schema_test();
+ let schema = iceberg_schema_for_schema_to_arrow_schema();
+ let converted_arrow_schema = schema_to_arrow_schema(&schema).unwrap();
+ assert_eq!(converted_arrow_schema, arrow_schema);
}
}