This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 5bf9f3c  fix: complete miss attribute for map && list in avro schema 
(#411)
5bf9f3c is described below

commit 5bf9f3cbee0320a8c67e320291c306033bacee2c
Author: ZENOTME <[email protected]>
AuthorDate: Thu Aug 8 23:33:12 2024 +0800

    fix: complete miss attribute for map && list in avro schema (#411)
    
    * add miss attr in list/map avro schema
    
    * refine error handle
    
    * fix unused warn
    
    * fix typos
    
    * update avro and unittest
    
    * refine check_schema_conversion
    
    ---------
    
    Co-authored-by: ZENOTME <[email protected]>
---
 Cargo.toml                           |   2 +-
 crates/iceberg/src/avro/schema.rs    | 737 +++++++++++++++++++++++------------
 crates/iceberg/src/spec/datatypes.rs |  13 +
 3 files changed, 501 insertions(+), 251 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 4826da2..2ec7ef1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -39,7 +39,7 @@ rust-version = "1.77.1"
 
 [workspace.dependencies]
 anyhow = "1.0.72"
-apache-avro = "0.16"
+apache-avro = "0.17"
 array-init = "2"
 arrow-arith = { version = "52" }
 arrow-array = { version = "52" }
diff --git a/crates/iceberg/src/avro/schema.rs 
b/crates/iceberg/src/avro/schema.rs
index 653f52a..7f81427 100644
--- a/crates/iceberg/src/avro/schema.rs
+++ b/crates/iceberg/src/avro/schema.rs
@@ -19,22 +19,26 @@
 use std::collections::BTreeMap;
 
 use apache_avro::schema::{
-    DecimalSchema, FixedSchema, Name, RecordField as AvroRecordField, 
RecordFieldOrder,
-    RecordSchema, UnionSchema,
+    ArraySchema, DecimalSchema, FixedSchema, MapSchema, Name, RecordField as 
AvroRecordField,
+    RecordFieldOrder, RecordSchema, UnionSchema,
 };
 use apache_avro::Schema as AvroSchema;
 use itertools::{Either, Itertools};
 use serde_json::{Number, Value};
 
 use crate::spec::{
-    visit_schema, ListType, MapType, NestedFieldRef, PrimitiveType, Schema, 
SchemaVisitor,
-    StructType,
+    visit_schema, ListType, MapType, NestedField, NestedFieldRef, 
PrimitiveType, Schema,
+    SchemaVisitor, StructType, Type,
 };
-use crate::{Error, ErrorKind, Result};
+use crate::{ensure_data_valid, Error, ErrorKind, Result};
 
+const ELEMENT_ID: &str = "element-id";
 const FILED_ID_PROP: &str = "field-id";
+const KEY_ID: &str = "key-id";
+const VALUE_ID: &str = "value-id";
 const UUID_BYTES: usize = 16;
 const UUID_LOGICAL_TYPE: &str = "uuid";
+const MAP_LOGICAL_TYPE: &str = "map";
 // # TODO: https://github.com/apache/iceberg-rust/issues/86
 // This const may better to maintain in avro-rs.
 const LOGICAL_TYPE: &str = "logicalType";
@@ -124,8 +128,13 @@ impl SchemaVisitor for SchemaToAvroSchema {
             field_schema = avro_optional(field_schema)?;
         }
 
-        // TODO: We need to add element id prop here, but rust's avro schema 
doesn't support property except record schema.
-        Ok(Either::Left(AvroSchema::Array(Box::new(field_schema))))
+        Ok(Either::Left(AvroSchema::Array(ArraySchema {
+            items: Box::new(field_schema),
+            attributes: BTreeMap::from([(
+                ELEMENT_ID.to_string(),
+                Value::Number(Number::from(list.element_field.id)),
+            )]),
+        })))
     }
 
     fn map(
@@ -141,7 +150,19 @@ impl SchemaVisitor for SchemaToAvroSchema {
         }
 
         if matches!(key_field_schema, AvroSchema::String) {
-            Ok(Either::Left(AvroSchema::Map(Box::new(value_field_schema))))
+            Ok(Either::Left(AvroSchema::Map(MapSchema {
+                types: Box::new(value_field_schema),
+                attributes: BTreeMap::from([
+                    (
+                        KEY_ID.to_string(),
+                        Value::Number(Number::from(map.key_field.id)),
+                    ),
+                    (
+                        VALUE_ID.to_string(),
+                        Value::Number(Number::from(map.value_field.id)),
+                    ),
+                ]),
+            })))
         } else {
             // Avro map requires that key must be string type. Here we convert 
it to array if key is
             // not string type.
@@ -187,7 +208,13 @@ impl SchemaVisitor for SchemaToAvroSchema {
                 fields,
             )?;
 
-            Ok(Either::Left(AvroSchema::Array(item_avro_schema.into())))
+            Ok(Either::Left(AvroSchema::Array(ArraySchema {
+                items: Box::new(item_avro_schema),
+                attributes: BTreeMap::from([(
+                    LOGICAL_TYPE.to_string(),
+                    Value::String(MAP_LOGICAL_TYPE.to_string()),
+                )]),
+            })))
         }
     }
 
@@ -255,6 +282,7 @@ pub(crate) fn avro_fixed_schema(len: usize, logical_type: 
Option<&str>) -> Resul
         doc: None,
         size: len,
         attributes,
+        default: None,
     }))
 }
 
@@ -274,6 +302,7 @@ pub(crate) fn avro_decimal_schema(precision: usize, scale: 
usize) -> Result<Avro
             doc: None,
             size: crate::spec::Type::decimal_required_bytes(precision as u32)? 
as usize,
             attributes: Default::default(),
+            default: None,
         })),
     }))
 }
@@ -285,266 +314,329 @@ fn avro_optional(avro_schema: AvroSchema) -> 
Result<AvroSchema> {
     ])?))
 }
 
-#[cfg(test)]
-mod tests {
-    use std::fs::read_to_string;
-
-    use apache_avro::schema::{Namespace, UnionSchema};
-    use apache_avro::Schema as AvroSchema;
-
-    use super::*;
-    use crate::ensure_data_valid;
-    use crate::spec::{ListType, MapType, NestedField, PrimitiveType, Schema, 
StructType, Type};
-
-    fn is_avro_optional(avro_schema: &AvroSchema) -> bool {
-        match avro_schema {
-            AvroSchema::Union(union) => union.is_nullable(),
-            _ => false,
-        }
+fn is_avro_optional(avro_schema: &AvroSchema) -> bool {
+    match avro_schema {
+        AvroSchema::Union(union) => union.is_nullable(),
+        _ => false,
     }
+}
 
-    /// Post order avro schema visitor.
-    pub(crate) trait AvroSchemaVisitor {
-        type T;
+/// Post order avro schema visitor.
+pub(crate) trait AvroSchemaVisitor {
+    type T;
 
-        fn record(&mut self, record: &RecordSchema, fields: Vec<Self::T>) -> 
Result<Self::T>;
+    fn record(&mut self, record: &RecordSchema, fields: Vec<Self::T>) -> 
Result<Self::T>;
 
-        fn union(&mut self, union: &UnionSchema, options: Vec<Self::T>) -> 
Result<Self::T>;
+    fn union(&mut self, union: &UnionSchema, options: Vec<Self::T>) -> 
Result<Self::T>;
 
-        fn array(&mut self, array: &AvroSchema, item: Self::T) -> 
Result<Self::T>;
-        fn map(&mut self, map: &AvroSchema, value: Self::T) -> Result<Self::T>;
+    fn array(&mut self, array: &ArraySchema, item: Self::T) -> Result<Self::T>;
+    fn map(&mut self, map: &MapSchema, value: Self::T) -> Result<Self::T>;
+    // There are two representation for iceberg map in avro: array of 
key-value records, or map when keys are strings (optional),
+    // ref: https://iceberg.apache.org/spec/#avro
+    fn map_array(&mut self, array: &RecordSchema, key: Self::T, value: 
Self::T) -> Result<Self::T>;
 
-        fn primitive(&mut self, schema: &AvroSchema) -> Result<Self::T>;
-    }
+    fn primitive(&mut self, schema: &AvroSchema) -> Result<Self::T>;
+}
 
-    struct AvroSchemaToSchema {
-        next_id: i32,
+/// Visit avro schema in post order visitor.
+pub(crate) fn visit<V: AvroSchemaVisitor>(schema: &AvroSchema, visitor: &mut 
V) -> Result<V::T> {
+    match schema {
+        AvroSchema::Record(record) => {
+            let field_results = record
+                .fields
+                .iter()
+                .map(|f| visit(&f.schema, visitor))
+                .collect::<Result<Vec<V::T>>>()?;
+
+            visitor.record(record, field_results)
+        }
+        AvroSchema::Union(union) => {
+            let option_results = union
+                .variants()
+                .iter()
+                .map(|f| visit(f, visitor))
+                .collect::<Result<Vec<V::T>>>()?;
+
+            visitor.union(union, option_results)
+        }
+        AvroSchema::Array(item) => {
+            if let Some(logical_type) = item
+                .attributes
+                .get(LOGICAL_TYPE)
+                .and_then(|v| Value::as_str(v))
+            {
+                if logical_type == MAP_LOGICAL_TYPE {
+                    if let AvroSchema::Record(record_schema) = &*item.items {
+                        let key = visit(&record_schema.fields[0].schema, 
visitor)?;
+                        let value = visit(&record_schema.fields[1].schema, 
visitor)?;
+                        return visitor.map_array(record_schema, key, value);
+                    } else {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            "Can't convert avro map schema, item is not a 
record.",
+                        ));
+                    }
+                } else {
+                    return Err(Error::new(
+                        ErrorKind::FeatureUnsupported,
+                        format!(
+                            "Logical type {logical_type} is not support in 
iceberg array type.",
+                        ),
+                    ));
+                }
+            }
+            let item_result = visit(&item.items, visitor)?;
+            visitor.array(item, item_result)
+        }
+        AvroSchema::Map(inner) => {
+            let item_result = visit(&inner.types, visitor)?;
+            visitor.map(inner, item_result)
+        }
+        schema => visitor.primitive(schema),
     }
+}
 
-    impl AvroSchemaToSchema {
-        fn next_field_id(&mut self) -> i32 {
-            self.next_id += 1;
-            self.next_id
-        }
+struct AvroSchemaToSchema;
+
+impl AvroSchemaToSchema {
+    /// A convenient way to get element id(i32) from attributes.
+    #[inline]
+    fn get_element_id_from_attributes(
+        attributes: &BTreeMap<String, Value>,
+        name: &str,
+    ) -> Result<i32> {
+        attributes
+            .get(name)
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "Can't convert avro array schema, missing element id.",
+                )
+            })?
+            .as_i64()
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "Can't convert avro array schema, element id is not a 
valid i64 number.",
+                )
+            })?
+            .try_into()
+            .map_err(|_| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "Can't convert avro array schema, element id is not a 
valid i32.",
+                )
+            })
     }
+}
 
-    impl AvroSchemaVisitor for AvroSchemaToSchema {
-        // Only `AvroSchema::Null` will return `None`
-        type T = Option<Type>;
-
-        fn record(
-            &mut self,
-            record: &RecordSchema,
-            field_types: Vec<Option<Type>>,
-        ) -> Result<Option<Type>> {
-            let mut fields = Vec::with_capacity(field_types.len());
-            for (avro_field, typ) in record.fields.iter().zip_eq(field_types) {
-                let field_id = avro_field
-                    .custom_attributes
-                    .get(FILED_ID_PROP)
-                    .and_then(Value::as_i64)
-                    .ok_or_else(|| {
-                        Error::new(
-                            ErrorKind::DataInvalid,
-                            format!("Can't convert field, missing field id: 
{avro_field:?}"),
-                        )
-                    })?;
+impl AvroSchemaVisitor for AvroSchemaToSchema {
+    // Only `AvroSchema::Null` will return `None`
+    type T = Option<Type>;
 
-                let optional = is_avro_optional(&avro_field.schema);
+    fn record(
+        &mut self,
+        record: &RecordSchema,
+        field_types: Vec<Option<Type>>,
+    ) -> Result<Option<Type>> {
+        let mut fields = Vec::with_capacity(field_types.len());
+        for (avro_field, typ) in record.fields.iter().zip_eq(field_types) {
+            let field_id =
+                
Self::get_element_id_from_attributes(&avro_field.custom_attributes, 
FILED_ID_PROP)?;
 
-                let mut field = if optional {
-                    NestedField::optional(field_id as i32, &avro_field.name, 
typ.unwrap())
-                } else {
-                    NestedField::required(field_id as i32, &avro_field.name, 
typ.unwrap())
-                };
+            let optional = is_avro_optional(&avro_field.schema);
 
-                if let Some(doc) = &avro_field.doc {
-                    field = field.with_doc(doc);
-                }
+            let mut field = NestedField::new(field_id, &avro_field.name, 
typ.unwrap(), !optional);
 
-                fields.push(field.into());
+            if let Some(doc) = &avro_field.doc {
+                field = field.with_doc(doc);
             }
 
-            Ok(Some(Type::Struct(StructType::new(fields))))
+            fields.push(field.into());
         }
 
-        fn union(
-            &mut self,
-            union: &UnionSchema,
-            mut options: Vec<Option<Type>>,
-        ) -> Result<Option<Type>> {
+        Ok(Some(Type::Struct(StructType::new(fields))))
+    }
+
+    fn union(
+        &mut self,
+        union: &UnionSchema,
+        mut options: Vec<Option<Type>>,
+    ) -> Result<Option<Type>> {
+        ensure_data_valid!(
+            options.len() <= 2 && !options.is_empty(),
+            "Can't convert avro union type {:?} to iceberg.",
+            union
+        );
+
+        if options.len() > 1 {
             ensure_data_valid!(
-                options.len() <= 2 && !options.is_empty(),
+                options[0].is_none(),
                 "Can't convert avro union type {:?} to iceberg.",
                 union
             );
-
-            if options.len() > 1 {
-                ensure_data_valid!(
-                    options[0].is_none(),
-                    "Can't convert avro union type {:?} to iceberg.",
-                    union
-                );
-            }
-
-            if options.len() == 1 {
-                Ok(Some(options.remove(0).unwrap()))
-            } else {
-                Ok(Some(options.remove(1).unwrap()))
-            }
         }
 
-        fn array(&mut self, array: &AvroSchema, item: Option<Type>) -> 
Result<Self::T> {
-            if let AvroSchema::Array(item_schema) = array {
-                let element_field = NestedField::list_element(
-                    self.next_field_id(),
-                    item.unwrap(),
-                    !is_avro_optional(item_schema),
-                )
-                .into();
-                Ok(Some(Type::List(ListType { element_field })))
-            } else {
-                Err(Error::new(
-                    ErrorKind::Unexpected,
-                    "Expected avro array schema, but {array}",
-                ))
-            }
+        if options.len() == 1 {
+            Ok(Some(options.remove(0).unwrap()))
+        } else {
+            Ok(Some(options.remove(1).unwrap()))
         }
+    }
 
-        fn map(&mut self, map: &AvroSchema, value: Option<Type>) -> 
Result<Option<Type>> {
-            if let AvroSchema::Map(value_schema) = map {
-                // Due to avro rust implementation's limitation, we can't 
store attributes in map schema,
-                // we will fix it later when it has been resolved.
-                let key_field = NestedField::map_key_element(
-                    self.next_field_id(),
-                    Type::Primitive(PrimitiveType::String),
-                );
-                let value_field = NestedField::map_value_element(
-                    self.next_field_id(),
-                    value.unwrap(),
-                    !is_avro_optional(value_schema),
-                );
-                Ok(Some(Type::Map(MapType {
-                    key_field: key_field.into(),
-                    value_field: value_field.into(),
-                })))
-            } else {
-                Err(Error::new(
-                    ErrorKind::Unexpected,
-                    "Expected avro map schema, but {map}",
-                ))
-            }
-        }
+    fn array(&mut self, array: &ArraySchema, item: Option<Type>) -> 
Result<Self::T> {
+        let element_field_id = 
Self::get_element_id_from_attributes(&array.attributes, ELEMENT_ID)?;
+        let element_field = NestedField::list_element(
+            element_field_id,
+            item.unwrap(),
+            !is_avro_optional(&array.items),
+        )
+        .into();
+        Ok(Some(Type::List(ListType { element_field })))
+    }
 
-        fn primitive(&mut self, schema: &AvroSchema) -> Result<Option<Type>> {
-            let typ = match schema {
-                AvroSchema::Decimal(decimal) => {
-                    Type::decimal(decimal.precision as u32, decimal.scale as 
u32)?
-                }
-                AvroSchema::Date => Type::Primitive(PrimitiveType::Date),
-                AvroSchema::TimeMicros => Type::Primitive(PrimitiveType::Time),
-                AvroSchema::TimestampMicros => 
Type::Primitive(PrimitiveType::Timestamp),
-                AvroSchema::Boolean => Type::Primitive(PrimitiveType::Boolean),
-                AvroSchema::Int => Type::Primitive(PrimitiveType::Int),
-                AvroSchema::Long => Type::Primitive(PrimitiveType::Long),
-                AvroSchema::Float => Type::Primitive(PrimitiveType::Float),
-                AvroSchema::Double => Type::Primitive(PrimitiveType::Double),
-                AvroSchema::String | AvroSchema::Enum(_) => 
Type::Primitive(PrimitiveType::String),
-                AvroSchema::Fixed(fixed) => {
-                    if let Some(logical_type) = 
fixed.attributes.get(LOGICAL_TYPE) {
-                        let logical_type = logical_type.as_str().ok_or_else(|| 
{
-                            Error::new(
-                                ErrorKind::DataInvalid,
-                                "logicalType in attributes of avro schema is 
not a string type",
-                            )
-                        })?;
-                        match logical_type {
-                            UUID_LOGICAL_TYPE => 
Type::Primitive(PrimitiveType::Uuid),
-                            ty => {
-                                return Err(Error::new(
-                                    ErrorKind::FeatureUnsupported,
-                                    format!(
+    fn map(&mut self, map: &MapSchema, value: Option<Type>) -> 
Result<Option<Type>> {
+        let key_field_id = 
Self::get_element_id_from_attributes(&map.attributes, KEY_ID)?;
+        let key_field =
+            NestedField::map_key_element(key_field_id, 
Type::Primitive(PrimitiveType::String));
+        let value_field_id = 
Self::get_element_id_from_attributes(&map.attributes, VALUE_ID)?;
+        let value_field = NestedField::map_value_element(
+            value_field_id,
+            value.unwrap(),
+            !is_avro_optional(&map.types),
+        );
+        Ok(Some(Type::Map(MapType {
+            key_field: key_field.into(),
+            value_field: value_field.into(),
+        })))
+    }
+
+    fn primitive(&mut self, schema: &AvroSchema) -> Result<Option<Type>> {
+        let typ = match schema {
+            AvroSchema::Decimal(decimal) => {
+                Type::decimal(decimal.precision as u32, decimal.scale as u32)?
+            }
+            AvroSchema::Date => Type::Primitive(PrimitiveType::Date),
+            AvroSchema::TimeMicros => Type::Primitive(PrimitiveType::Time),
+            AvroSchema::TimestampMicros => 
Type::Primitive(PrimitiveType::Timestamp),
+            AvroSchema::Boolean => Type::Primitive(PrimitiveType::Boolean),
+            AvroSchema::Int => Type::Primitive(PrimitiveType::Int),
+            AvroSchema::Long => Type::Primitive(PrimitiveType::Long),
+            AvroSchema::Float => Type::Primitive(PrimitiveType::Float),
+            AvroSchema::Double => Type::Primitive(PrimitiveType::Double),
+            AvroSchema::String | AvroSchema::Enum(_) => 
Type::Primitive(PrimitiveType::String),
+            AvroSchema::Fixed(fixed) => {
+                if let Some(logical_type) = fixed.attributes.get(LOGICAL_TYPE) 
{
+                    let logical_type = logical_type.as_str().ok_or_else(|| {
+                        Error::new(
+                            ErrorKind::DataInvalid,
+                            "logicalType in attributes of avro schema is not a 
string type",
+                        )
+                    })?;
+                    match logical_type {
+                        UUID_LOGICAL_TYPE => 
Type::Primitive(PrimitiveType::Uuid),
+                        ty => {
+                            return Err(Error::new(
+                                ErrorKind::FeatureUnsupported,
+                                format!(
                                     "Logical type {ty} is not support in 
iceberg primitive type.",
                                 ),
-                                ))
-                            }
+                            ))
                         }
-                    } else {
-                        Type::Primitive(PrimitiveType::Fixed(fixed.size as 
u64))
                     }
+                } else {
+                    Type::Primitive(PrimitiveType::Fixed(fixed.size as u64))
                 }
-                AvroSchema::Bytes => Type::Primitive(PrimitiveType::Binary),
-                AvroSchema::Null => return Ok(None),
-                _ => {
-                    return Err(Error::new(
-                        ErrorKind::Unexpected,
-                        "Unable to convert avro {schema} to iceberg primitive 
type.",
-                    ))
-                }
-            };
-
-            Ok(Some(typ))
-        }
-    }
-
-    /// Visit avro schema in post order visitor.
-    pub(crate) fn visit<V: AvroSchemaVisitor>(
-        schema: &AvroSchema,
-        visitor: &mut V,
-    ) -> Result<V::T> {
-        match schema {
-            AvroSchema::Record(record) => {
-                let field_results = record
-                    .fields
-                    .iter()
-                    .map(|f| visit(&f.schema, visitor))
-                    .collect::<Result<Vec<V::T>>>()?;
-
-                visitor.record(record, field_results)
-            }
-            AvroSchema::Union(union) => {
-                let option_results = union
-                    .variants()
-                    .iter()
-                    .map(|f| visit(f, visitor))
-                    .collect::<Result<Vec<V::T>>>()?;
-
-                visitor.union(union, option_results)
             }
-            AvroSchema::Array(item) => {
-                let item_result = visit(item, visitor)?;
-                visitor.array(schema, item_result)
-            }
-            AvroSchema::Map(inner) => {
-                let item_result = visit(inner, visitor)?;
-                visitor.map(schema, item_result)
-            }
-            schema => visitor.primitive(schema),
-        }
-    }
-    /// Converts avro schema to iceberg schema.
-    pub(crate) fn avro_schema_to_schema(avro_schema: &AvroSchema) -> 
Result<Schema> {
-        if let AvroSchema::Record(_) = avro_schema {
-            let mut converter = AvroSchemaToSchema { next_id: 0 };
-            let typ =
-                visit(avro_schema, &mut converter)?.expect("Iceberg schema 
should not be none.");
-            if let Type::Struct(s) = typ {
-                Schema::builder()
-                    .with_fields(s.fields().iter().cloned())
-                    .build()
-            } else {
-                Err(Error::new(
+            AvroSchema::Bytes => Type::Primitive(PrimitiveType::Binary),
+            AvroSchema::Null => return Ok(None),
+            _ => {
+                return Err(Error::new(
                     ErrorKind::Unexpected,
-                    format!("Expected to convert avro record schema to struct 
type, but {typ}"),
+                    "Unable to convert avro {schema} to iceberg primitive 
type.",
                 ))
             }
+        };
+
+        Ok(Some(typ))
+    }
+
+    fn map_array(
+        &mut self,
+        array: &RecordSchema,
+        key: Option<Type>,
+        value: Option<Type>,
+    ) -> Result<Self::T> {
+        let key = key.ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                "Can't convert avro map schema, missing key schema.",
+            )
+        })?;
+        let value = value.ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                "Can't convert avro map schema, missing value schema.",
+            )
+        })?;
+        let key_id = Self::get_element_id_from_attributes(
+            &array.fields[0].custom_attributes,
+            FILED_ID_PROP,
+        )?;
+        let value_id = Self::get_element_id_from_attributes(
+            &array.fields[1].custom_attributes,
+            FILED_ID_PROP,
+        )?;
+        let key_field = NestedField::map_key_element(key_id, key);
+        let value_field = NestedField::map_value_element(
+            value_id,
+            value,
+            !is_avro_optional(&array.fields[1].schema),
+        );
+        Ok(Some(Type::Map(MapType {
+            key_field: key_field.into(),
+            value_field: value_field.into(),
+        })))
+    }
+}
+
+// # TODO
+// Fix this when we have used `avro_schema_to_schema` inner.
+#[allow(unused)]
+/// Converts avro schema to iceberg schema.
+pub(crate) fn avro_schema_to_schema(avro_schema: &AvroSchema) -> 
Result<Schema> {
+    if let AvroSchema::Record(_) = avro_schema {
+        let mut converter = AvroSchemaToSchema;
+        let typ = visit(avro_schema, &mut converter)?.expect("Iceberg schema 
should not be none.");
+        if let Type::Struct(s) = typ {
+            Schema::builder()
+                .with_fields(s.fields().iter().cloned())
+                .build()
         } else {
             Err(Error::new(
-                ErrorKind::DataInvalid,
-                "Can't convert non record avro schema to iceberg schema: 
{avro_schema}",
+                ErrorKind::Unexpected,
+                format!("Expected to convert avro record schema to struct 
type, but {typ}"),
             ))
         }
+    } else {
+        Err(Error::new(
+            ErrorKind::DataInvalid,
+            "Can't convert non record avro schema to iceberg schema: 
{avro_schema}",
+        ))
     }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::fs::read_to_string;
+    use std::sync::Arc;
+
+    use apache_avro::schema::{Namespace, UnionSchema};
+    use apache_avro::Schema as AvroSchema;
+
+    use super::*;
+    use crate::avro::schema::AvroSchemaToSchema;
+    use crate::spec::{ListType, MapType, NestedField, PrimitiveType, Schema, 
StructType, Type};
 
     fn read_test_data_file_to_avro_schema(filename: &str) -> AvroSchema {
         let input = read_to_string(format!(
@@ -557,22 +649,27 @@ mod tests {
         AvroSchema::parse_str(input.as_str()).unwrap()
     }
 
-    fn check_schema_conversion(
-        avro_schema: AvroSchema,
-        expected_iceberg_schema: Schema,
-        check_avro_to_iceberg: bool,
-    ) {
-        if check_avro_to_iceberg {
-            let converted_iceberg_schema = 
avro_schema_to_schema(&avro_schema).unwrap();
-            assert_eq!(expected_iceberg_schema, converted_iceberg_schema);
-        }
+    /// Help function to check schema conversion between avro and iceberg:
+    /// 1. avro to iceberg
+    /// 2. iceberg to avro
+    /// 3. iceberg to avro to iceberg back
+    fn check_schema_conversion(avro_schema: AvroSchema, iceberg_schema: 
Schema) {
+        // 1. avro to iceberg
+        let converted_iceberg_schema = 
avro_schema_to_schema(&avro_schema).unwrap();
+        assert_eq!(iceberg_schema, converted_iceberg_schema);
 
+        // 2. iceberg to avro
         let converted_avro_schema = schema_to_avro_schema(
             avro_schema.name().unwrap().fullname(Namespace::None),
-            &expected_iceberg_schema,
+            &iceberg_schema,
         )
         .unwrap();
         assert_eq!(avro_schema, converted_avro_schema);
+
+        // 3.iceberg to avro to iceberg back
+        let converted_avro_converted_iceberg_schema =
+            avro_schema_to_schema(&converted_avro_schema).unwrap();
+        assert_eq!(iceberg_schema, converted_avro_converted_iceberg_schema);
     }
 
     #[test]
@@ -651,7 +748,6 @@ mod tests {
         check_schema_conversion(
             
read_test_data_file_to_avro_schema("avro_schema_manifest_file_v1.json"),
             iceberg_schema,
-            false,
         );
     }
 
@@ -700,7 +796,7 @@ mod tests {
                 .unwrap()
         };
 
-        check_schema_conversion(avro_schema, iceberg_schema, false);
+        check_schema_conversion(avro_schema, iceberg_schema);
     }
 
     #[test]
@@ -749,7 +845,7 @@ mod tests {
                 .unwrap()
         };
 
-        check_schema_conversion(avro_schema, iceberg_schema, false);
+        check_schema_conversion(avro_schema, iceberg_schema);
     }
 
     #[test]
@@ -826,7 +922,144 @@ mod tests {
                 .unwrap()
         };
 
-        check_schema_conversion(avro_schema, iceberg_schema, false);
+        check_schema_conversion(avro_schema, iceberg_schema);
+    }
+
+    #[test]
+    fn test_schema_with_array_map() {
+        let avro_schema = {
+            AvroSchema::parse_str(
+                r#"
+{
+    "type": "record",
+    "name": "avro_schema",
+    "fields": [
+        {
+            "name": "optional",
+            "type": {
+                "type": "array",
+                "items": {
+                    "type": "record",
+                    "name": "k102_v103",
+                    "fields": [
+                        {
+                            "name": "key",
+                            "type": "boolean",
+                            "field-id": 102
+                        },
+                        {
+                            "name": "value",
+                            "type": ["null", "boolean"],
+                            "field-id": 103
+                        }
+                    ]
+                },
+                "default": [],
+                "element-id": 101,
+                "logicalType": "map"
+            },
+            "field-id": 100
+        },{
+            "name": "required",
+            "type": {
+                "type": "array",
+                "items": {
+                    "type": "record",
+                    "name": "k105_v106",
+                    "fields": [
+                        {
+                            "name": "key",
+                            "type": "boolean",
+                            "field-id": 105
+                        },
+                        {
+                            "name": "value",
+                            "type": "boolean",
+                            "field-id": 106
+                        }
+                    ]
+                },
+                "default": [],
+                "logicalType": "map"
+            },
+            "field-id": 104
+        }, {
+            "name": "string_map",
+            "type": {
+                "type": "map",
+                "values": ["null", "long"],
+                "key-id": 108,
+                "value-id": 109
+            },
+            "field-id": 107
+        }
+    ]
+}
+"#,
+            )
+            .unwrap()
+        };
+
+        let iceberg_schema = {
+            Schema::builder()
+                .with_fields(vec![
+                    Arc::new(NestedField::required(
+                        100,
+                        "optional",
+                        Type::Map(MapType {
+                            key_field: NestedField::map_key_element(
+                                102,
+                                PrimitiveType::Boolean.into(),
+                            )
+                            .into(),
+                            value_field: NestedField::map_value_element(
+                                103,
+                                PrimitiveType::Boolean.into(),
+                                false,
+                            )
+                            .into(),
+                        }),
+                    )),
+                    Arc::new(NestedField::required(
+                        104,
+                        "required",
+                        Type::Map(MapType {
+                            key_field: NestedField::map_key_element(
+                                105,
+                                PrimitiveType::Boolean.into(),
+                            )
+                            .into(),
+                            value_field: NestedField::map_value_element(
+                                106,
+                                PrimitiveType::Boolean.into(),
+                                true,
+                            )
+                            .into(),
+                        }),
+                    )),
+                    Arc::new(NestedField::required(
+                        107,
+                        "string_map",
+                        Type::Map(MapType {
+                            key_field: NestedField::map_key_element(
+                                108,
+                                PrimitiveType::String.into(),
+                            )
+                            .into(),
+                            value_field: NestedField::map_value_element(
+                                109,
+                                PrimitiveType::Long.into(),
+                                false,
+                            )
+                            .into(),
+                        }),
+                    )),
+                ])
+                .build()
+                .unwrap()
+        };
+
+        check_schema_conversion(avro_schema, iceberg_schema);
     }
 
     #[test]
@@ -838,7 +1071,7 @@ mod tests {
         ])
         .unwrap();
 
-        let mut converter = AvroSchemaToSchema { next_id: 0 };
+        let mut converter = AvroSchemaToSchema;
 
         let options = avro_schema
             .variants()
@@ -850,7 +1083,7 @@ mod tests {
 
     #[test]
     fn test_string_type() {
-        let mut converter = AvroSchemaToSchema { next_id: 0 };
+        let mut converter = AvroSchemaToSchema;
         let avro_schema = AvroSchema::String;
 
         assert_eq!(
@@ -875,10 +1108,14 @@ mod tests {
             .unwrap()
         };
 
-        let mut converter = AvroSchemaToSchema { next_id: 0 };
+        let AvroSchema::Map(avro_schema) = avro_schema else {
+            unreachable!()
+        };
+
+        let mut converter = AvroSchemaToSchema;
         let iceberg_type = Type::Map(MapType {
-            key_field: NestedField::map_key_element(1, 
PrimitiveType::String.into()).into(),
-            value_field: NestedField::map_value_element(2, 
PrimitiveType::Long.into(), false)
+            key_field: NestedField::map_key_element(101, 
PrimitiveType::String.into()).into(),
+            value_field: NestedField::map_value_element(102, 
PrimitiveType::Long.into(), false)
                 .into(),
         });
 
@@ -902,7 +1139,7 @@ mod tests {
             .unwrap()
         };
 
-        let mut converter = AvroSchemaToSchema { next_id: 0 };
+        let mut converter = AvroSchemaToSchema;
 
         let iceberg_type = Type::from(PrimitiveType::Fixed(22));
 
@@ -914,7 +1151,7 @@ mod tests {
 
     #[test]
     fn test_unknown_primitive() {
-        let mut converter = AvroSchemaToSchema { next_id: 0 };
+        let mut converter = AvroSchemaToSchema;
 
         assert!(converter.primitive(&AvroSchema::Duration).is_err());
     }
@@ -953,7 +1190,7 @@ mod tests {
             .unwrap()
         };
 
-        let mut converter = AvroSchemaToSchema { next_id: 0 };
+        let mut converter = AvroSchemaToSchema;
 
         assert_eq!(
             Type::decimal(25, 19).unwrap(),
@@ -963,7 +1200,7 @@ mod tests {
 
     #[test]
     fn test_date_type() {
-        let mut converter = AvroSchemaToSchema { next_id: 0 };
+        let mut converter = AvroSchemaToSchema;
 
         assert_eq!(
             Type::from(PrimitiveType::Date),
diff --git a/crates/iceberg/src/spec/datatypes.rs 
b/crates/iceberg/src/spec/datatypes.rs
index 06dc955..d888387 100644
--- a/crates/iceberg/src/spec/datatypes.rs
+++ b/crates/iceberg/src/spec/datatypes.rs
@@ -581,6 +581,19 @@ impl From<NestedField> for SerdeNestedField {
 pub type NestedFieldRef = Arc<NestedField>;
 
 impl NestedField {
+    /// Construct a new field.
+    pub fn new(id: i32, name: impl ToString, field_type: Type, required: bool) 
-> Self {
+        Self {
+            id,
+            name: name.to_string(),
+            required,
+            field_type: Box::new(field_type),
+            doc: None,
+            initial_default: None,
+            write_default: None,
+        }
+    }
+
     /// Construct a required field.
     pub fn required(id: i32, name: impl ToString, field_type: Type) -> Self {
         Self {


Reply via email to