This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 5bf9f3c fix: complete miss attribute for map && list in avro schema
(#411)
5bf9f3c is described below
commit 5bf9f3cbee0320a8c67e320291c306033bacee2c
Author: ZENOTME <[email protected]>
AuthorDate: Thu Aug 8 23:33:12 2024 +0800
fix: complete miss attribute for map && list in avro schema (#411)
* add miss attr in list/map avro schema
* refine error handle
* fix unused warn
* fix typos
* update avro and unittest
* refine check_schema_conversion
---------
Co-authored-by: ZENOTME <[email protected]>
---
Cargo.toml | 2 +-
crates/iceberg/src/avro/schema.rs | 737 +++++++++++++++++++++++------------
crates/iceberg/src/spec/datatypes.rs | 13 +
3 files changed, 501 insertions(+), 251 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 4826da2..2ec7ef1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -39,7 +39,7 @@ rust-version = "1.77.1"
[workspace.dependencies]
anyhow = "1.0.72"
-apache-avro = "0.16"
+apache-avro = "0.17"
array-init = "2"
arrow-arith = { version = "52" }
arrow-array = { version = "52" }
diff --git a/crates/iceberg/src/avro/schema.rs
b/crates/iceberg/src/avro/schema.rs
index 653f52a..7f81427 100644
--- a/crates/iceberg/src/avro/schema.rs
+++ b/crates/iceberg/src/avro/schema.rs
@@ -19,22 +19,26 @@
use std::collections::BTreeMap;
use apache_avro::schema::{
- DecimalSchema, FixedSchema, Name, RecordField as AvroRecordField,
RecordFieldOrder,
- RecordSchema, UnionSchema,
+ ArraySchema, DecimalSchema, FixedSchema, MapSchema, Name, RecordField as
AvroRecordField,
+ RecordFieldOrder, RecordSchema, UnionSchema,
};
use apache_avro::Schema as AvroSchema;
use itertools::{Either, Itertools};
use serde_json::{Number, Value};
use crate::spec::{
- visit_schema, ListType, MapType, NestedFieldRef, PrimitiveType, Schema,
SchemaVisitor,
- StructType,
+ visit_schema, ListType, MapType, NestedField, NestedFieldRef,
PrimitiveType, Schema,
+ SchemaVisitor, StructType, Type,
};
-use crate::{Error, ErrorKind, Result};
+use crate::{ensure_data_valid, Error, ErrorKind, Result};
+const ELEMENT_ID: &str = "element-id";
const FILED_ID_PROP: &str = "field-id";
+const KEY_ID: &str = "key-id";
+const VALUE_ID: &str = "value-id";
const UUID_BYTES: usize = 16;
const UUID_LOGICAL_TYPE: &str = "uuid";
+const MAP_LOGICAL_TYPE: &str = "map";
// # TODO: https://github.com/apache/iceberg-rust/issues/86
// This const may better to maintain in avro-rs.
const LOGICAL_TYPE: &str = "logicalType";
@@ -124,8 +128,13 @@ impl SchemaVisitor for SchemaToAvroSchema {
field_schema = avro_optional(field_schema)?;
}
- // TODO: We need to add element id prop here, but rust's avro schema
doesn't support property except record schema.
- Ok(Either::Left(AvroSchema::Array(Box::new(field_schema))))
+ Ok(Either::Left(AvroSchema::Array(ArraySchema {
+ items: Box::new(field_schema),
+ attributes: BTreeMap::from([(
+ ELEMENT_ID.to_string(),
+ Value::Number(Number::from(list.element_field.id)),
+ )]),
+ })))
}
fn map(
@@ -141,7 +150,19 @@ impl SchemaVisitor for SchemaToAvroSchema {
}
if matches!(key_field_schema, AvroSchema::String) {
- Ok(Either::Left(AvroSchema::Map(Box::new(value_field_schema))))
+ Ok(Either::Left(AvroSchema::Map(MapSchema {
+ types: Box::new(value_field_schema),
+ attributes: BTreeMap::from([
+ (
+ KEY_ID.to_string(),
+ Value::Number(Number::from(map.key_field.id)),
+ ),
+ (
+ VALUE_ID.to_string(),
+ Value::Number(Number::from(map.value_field.id)),
+ ),
+ ]),
+ })))
} else {
// Avro map requires that key must be string type. Here we convert
it to array if key is
// not string type.
@@ -187,7 +208,13 @@ impl SchemaVisitor for SchemaToAvroSchema {
fields,
)?;
- Ok(Either::Left(AvroSchema::Array(item_avro_schema.into())))
+ Ok(Either::Left(AvroSchema::Array(ArraySchema {
+ items: Box::new(item_avro_schema),
+ attributes: BTreeMap::from([(
+ LOGICAL_TYPE.to_string(),
+ Value::String(MAP_LOGICAL_TYPE.to_string()),
+ )]),
+ })))
}
}
@@ -255,6 +282,7 @@ pub(crate) fn avro_fixed_schema(len: usize, logical_type:
Option<&str>) -> Resul
doc: None,
size: len,
attributes,
+ default: None,
}))
}
@@ -274,6 +302,7 @@ pub(crate) fn avro_decimal_schema(precision: usize, scale:
usize) -> Result<Avro
doc: None,
size: crate::spec::Type::decimal_required_bytes(precision as u32)?
as usize,
attributes: Default::default(),
+ default: None,
})),
}))
}
@@ -285,266 +314,329 @@ fn avro_optional(avro_schema: AvroSchema) ->
Result<AvroSchema> {
])?))
}
-#[cfg(test)]
-mod tests {
- use std::fs::read_to_string;
-
- use apache_avro::schema::{Namespace, UnionSchema};
- use apache_avro::Schema as AvroSchema;
-
- use super::*;
- use crate::ensure_data_valid;
- use crate::spec::{ListType, MapType, NestedField, PrimitiveType, Schema,
StructType, Type};
-
- fn is_avro_optional(avro_schema: &AvroSchema) -> bool {
- match avro_schema {
- AvroSchema::Union(union) => union.is_nullable(),
- _ => false,
- }
+fn is_avro_optional(avro_schema: &AvroSchema) -> bool {
+ match avro_schema {
+ AvroSchema::Union(union) => union.is_nullable(),
+ _ => false,
}
+}
- /// Post order avro schema visitor.
- pub(crate) trait AvroSchemaVisitor {
- type T;
+/// Post order avro schema visitor.
+pub(crate) trait AvroSchemaVisitor {
+ type T;
- fn record(&mut self, record: &RecordSchema, fields: Vec<Self::T>) ->
Result<Self::T>;
+ fn record(&mut self, record: &RecordSchema, fields: Vec<Self::T>) ->
Result<Self::T>;
- fn union(&mut self, union: &UnionSchema, options: Vec<Self::T>) ->
Result<Self::T>;
+ fn union(&mut self, union: &UnionSchema, options: Vec<Self::T>) ->
Result<Self::T>;
- fn array(&mut self, array: &AvroSchema, item: Self::T) ->
Result<Self::T>;
- fn map(&mut self, map: &AvroSchema, value: Self::T) -> Result<Self::T>;
+ fn array(&mut self, array: &ArraySchema, item: Self::T) -> Result<Self::T>;
+ fn map(&mut self, map: &MapSchema, value: Self::T) -> Result<Self::T>;
+ // There are two representation for iceberg map in avro: array of
key-value records, or map when keys are strings (optional),
+ // ref: https://iceberg.apache.org/spec/#avro
+ fn map_array(&mut self, array: &RecordSchema, key: Self::T, value:
Self::T) -> Result<Self::T>;
- fn primitive(&mut self, schema: &AvroSchema) -> Result<Self::T>;
- }
+ fn primitive(&mut self, schema: &AvroSchema) -> Result<Self::T>;
+}
- struct AvroSchemaToSchema {
- next_id: i32,
+/// Visit avro schema in post order visitor.
+pub(crate) fn visit<V: AvroSchemaVisitor>(schema: &AvroSchema, visitor: &mut
V) -> Result<V::T> {
+ match schema {
+ AvroSchema::Record(record) => {
+ let field_results = record
+ .fields
+ .iter()
+ .map(|f| visit(&f.schema, visitor))
+ .collect::<Result<Vec<V::T>>>()?;
+
+ visitor.record(record, field_results)
+ }
+ AvroSchema::Union(union) => {
+ let option_results = union
+ .variants()
+ .iter()
+ .map(|f| visit(f, visitor))
+ .collect::<Result<Vec<V::T>>>()?;
+
+ visitor.union(union, option_results)
+ }
+ AvroSchema::Array(item) => {
+ if let Some(logical_type) = item
+ .attributes
+ .get(LOGICAL_TYPE)
+ .and_then(|v| Value::as_str(v))
+ {
+ if logical_type == MAP_LOGICAL_TYPE {
+ if let AvroSchema::Record(record_schema) = &*item.items {
+ let key = visit(&record_schema.fields[0].schema,
visitor)?;
+ let value = visit(&record_schema.fields[1].schema,
visitor)?;
+ return visitor.map_array(record_schema, key, value);
+ } else {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Can't convert avro map schema, item is not a
record.",
+ ));
+ }
+ } else {
+ return Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ format!(
+ "Logical type {logical_type} is not support in
iceberg array type.",
+ ),
+ ));
+ }
+ }
+ let item_result = visit(&item.items, visitor)?;
+ visitor.array(item, item_result)
+ }
+ AvroSchema::Map(inner) => {
+ let item_result = visit(&inner.types, visitor)?;
+ visitor.map(inner, item_result)
+ }
+ schema => visitor.primitive(schema),
}
+}
- impl AvroSchemaToSchema {
- fn next_field_id(&mut self) -> i32 {
- self.next_id += 1;
- self.next_id
- }
+struct AvroSchemaToSchema;
+
+impl AvroSchemaToSchema {
+ /// A convenient way to get element id(i32) from attributes.
+ #[inline]
+ fn get_element_id_from_attributes(
+ attributes: &BTreeMap<String, Value>,
+ name: &str,
+ ) -> Result<i32> {
+ attributes
+ .get(name)
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Can't convert avro array schema, missing element id.",
+ )
+ })?
+ .as_i64()
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Can't convert avro array schema, element id is not a
valid i64 number.",
+ )
+ })?
+ .try_into()
+ .map_err(|_| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Can't convert avro array schema, element id is not a
valid i32.",
+ )
+ })
}
+}
- impl AvroSchemaVisitor for AvroSchemaToSchema {
- // Only `AvroSchema::Null` will return `None`
- type T = Option<Type>;
-
- fn record(
- &mut self,
- record: &RecordSchema,
- field_types: Vec<Option<Type>>,
- ) -> Result<Option<Type>> {
- let mut fields = Vec::with_capacity(field_types.len());
- for (avro_field, typ) in record.fields.iter().zip_eq(field_types) {
- let field_id = avro_field
- .custom_attributes
- .get(FILED_ID_PROP)
- .and_then(Value::as_i64)
- .ok_or_else(|| {
- Error::new(
- ErrorKind::DataInvalid,
- format!("Can't convert field, missing field id:
{avro_field:?}"),
- )
- })?;
+impl AvroSchemaVisitor for AvroSchemaToSchema {
+ // Only `AvroSchema::Null` will return `None`
+ type T = Option<Type>;
- let optional = is_avro_optional(&avro_field.schema);
+ fn record(
+ &mut self,
+ record: &RecordSchema,
+ field_types: Vec<Option<Type>>,
+ ) -> Result<Option<Type>> {
+ let mut fields = Vec::with_capacity(field_types.len());
+ for (avro_field, typ) in record.fields.iter().zip_eq(field_types) {
+ let field_id =
+
Self::get_element_id_from_attributes(&avro_field.custom_attributes,
FILED_ID_PROP)?;
- let mut field = if optional {
- NestedField::optional(field_id as i32, &avro_field.name,
typ.unwrap())
- } else {
- NestedField::required(field_id as i32, &avro_field.name,
typ.unwrap())
- };
+ let optional = is_avro_optional(&avro_field.schema);
- if let Some(doc) = &avro_field.doc {
- field = field.with_doc(doc);
- }
+ let mut field = NestedField::new(field_id, &avro_field.name,
typ.unwrap(), !optional);
- fields.push(field.into());
+ if let Some(doc) = &avro_field.doc {
+ field = field.with_doc(doc);
}
- Ok(Some(Type::Struct(StructType::new(fields))))
+ fields.push(field.into());
}
- fn union(
- &mut self,
- union: &UnionSchema,
- mut options: Vec<Option<Type>>,
- ) -> Result<Option<Type>> {
+ Ok(Some(Type::Struct(StructType::new(fields))))
+ }
+
+ fn union(
+ &mut self,
+ union: &UnionSchema,
+ mut options: Vec<Option<Type>>,
+ ) -> Result<Option<Type>> {
+ ensure_data_valid!(
+ options.len() <= 2 && !options.is_empty(),
+ "Can't convert avro union type {:?} to iceberg.",
+ union
+ );
+
+ if options.len() > 1 {
ensure_data_valid!(
- options.len() <= 2 && !options.is_empty(),
+ options[0].is_none(),
"Can't convert avro union type {:?} to iceberg.",
union
);
-
- if options.len() > 1 {
- ensure_data_valid!(
- options[0].is_none(),
- "Can't convert avro union type {:?} to iceberg.",
- union
- );
- }
-
- if options.len() == 1 {
- Ok(Some(options.remove(0).unwrap()))
- } else {
- Ok(Some(options.remove(1).unwrap()))
- }
}
- fn array(&mut self, array: &AvroSchema, item: Option<Type>) ->
Result<Self::T> {
- if let AvroSchema::Array(item_schema) = array {
- let element_field = NestedField::list_element(
- self.next_field_id(),
- item.unwrap(),
- !is_avro_optional(item_schema),
- )
- .into();
- Ok(Some(Type::List(ListType { element_field })))
- } else {
- Err(Error::new(
- ErrorKind::Unexpected,
- "Expected avro array schema, but {array}",
- ))
- }
+ if options.len() == 1 {
+ Ok(Some(options.remove(0).unwrap()))
+ } else {
+ Ok(Some(options.remove(1).unwrap()))
}
+ }
- fn map(&mut self, map: &AvroSchema, value: Option<Type>) ->
Result<Option<Type>> {
- if let AvroSchema::Map(value_schema) = map {
- // Due to avro rust implementation's limitation, we can't
store attributes in map schema,
- // we will fix it later when it has been resolved.
- let key_field = NestedField::map_key_element(
- self.next_field_id(),
- Type::Primitive(PrimitiveType::String),
- );
- let value_field = NestedField::map_value_element(
- self.next_field_id(),
- value.unwrap(),
- !is_avro_optional(value_schema),
- );
- Ok(Some(Type::Map(MapType {
- key_field: key_field.into(),
- value_field: value_field.into(),
- })))
- } else {
- Err(Error::new(
- ErrorKind::Unexpected,
- "Expected avro map schema, but {map}",
- ))
- }
- }
+ fn array(&mut self, array: &ArraySchema, item: Option<Type>) ->
Result<Self::T> {
+ let element_field_id =
Self::get_element_id_from_attributes(&array.attributes, ELEMENT_ID)?;
+ let element_field = NestedField::list_element(
+ element_field_id,
+ item.unwrap(),
+ !is_avro_optional(&array.items),
+ )
+ .into();
+ Ok(Some(Type::List(ListType { element_field })))
+ }
- fn primitive(&mut self, schema: &AvroSchema) -> Result<Option<Type>> {
- let typ = match schema {
- AvroSchema::Decimal(decimal) => {
- Type::decimal(decimal.precision as u32, decimal.scale as
u32)?
- }
- AvroSchema::Date => Type::Primitive(PrimitiveType::Date),
- AvroSchema::TimeMicros => Type::Primitive(PrimitiveType::Time),
- AvroSchema::TimestampMicros =>
Type::Primitive(PrimitiveType::Timestamp),
- AvroSchema::Boolean => Type::Primitive(PrimitiveType::Boolean),
- AvroSchema::Int => Type::Primitive(PrimitiveType::Int),
- AvroSchema::Long => Type::Primitive(PrimitiveType::Long),
- AvroSchema::Float => Type::Primitive(PrimitiveType::Float),
- AvroSchema::Double => Type::Primitive(PrimitiveType::Double),
- AvroSchema::String | AvroSchema::Enum(_) =>
Type::Primitive(PrimitiveType::String),
- AvroSchema::Fixed(fixed) => {
- if let Some(logical_type) =
fixed.attributes.get(LOGICAL_TYPE) {
- let logical_type = logical_type.as_str().ok_or_else(||
{
- Error::new(
- ErrorKind::DataInvalid,
- "logicalType in attributes of avro schema is
not a string type",
- )
- })?;
- match logical_type {
- UUID_LOGICAL_TYPE =>
Type::Primitive(PrimitiveType::Uuid),
- ty => {
- return Err(Error::new(
- ErrorKind::FeatureUnsupported,
- format!(
+ fn map(&mut self, map: &MapSchema, value: Option<Type>) ->
Result<Option<Type>> {
+ let key_field_id =
Self::get_element_id_from_attributes(&map.attributes, KEY_ID)?;
+ let key_field =
+ NestedField::map_key_element(key_field_id,
Type::Primitive(PrimitiveType::String));
+ let value_field_id =
Self::get_element_id_from_attributes(&map.attributes, VALUE_ID)?;
+ let value_field = NestedField::map_value_element(
+ value_field_id,
+ value.unwrap(),
+ !is_avro_optional(&map.types),
+ );
+ Ok(Some(Type::Map(MapType {
+ key_field: key_field.into(),
+ value_field: value_field.into(),
+ })))
+ }
+
+ fn primitive(&mut self, schema: &AvroSchema) -> Result<Option<Type>> {
+ let typ = match schema {
+ AvroSchema::Decimal(decimal) => {
+ Type::decimal(decimal.precision as u32, decimal.scale as u32)?
+ }
+ AvroSchema::Date => Type::Primitive(PrimitiveType::Date),
+ AvroSchema::TimeMicros => Type::Primitive(PrimitiveType::Time),
+ AvroSchema::TimestampMicros =>
Type::Primitive(PrimitiveType::Timestamp),
+ AvroSchema::Boolean => Type::Primitive(PrimitiveType::Boolean),
+ AvroSchema::Int => Type::Primitive(PrimitiveType::Int),
+ AvroSchema::Long => Type::Primitive(PrimitiveType::Long),
+ AvroSchema::Float => Type::Primitive(PrimitiveType::Float),
+ AvroSchema::Double => Type::Primitive(PrimitiveType::Double),
+ AvroSchema::String | AvroSchema::Enum(_) =>
Type::Primitive(PrimitiveType::String),
+ AvroSchema::Fixed(fixed) => {
+ if let Some(logical_type) = fixed.attributes.get(LOGICAL_TYPE)
{
+ let logical_type = logical_type.as_str().ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "logicalType in attributes of avro schema is not a
string type",
+ )
+ })?;
+ match logical_type {
+ UUID_LOGICAL_TYPE =>
Type::Primitive(PrimitiveType::Uuid),
+ ty => {
+ return Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ format!(
"Logical type {ty} is not support in
iceberg primitive type.",
),
- ))
- }
+ ))
}
- } else {
- Type::Primitive(PrimitiveType::Fixed(fixed.size as
u64))
}
+ } else {
+ Type::Primitive(PrimitiveType::Fixed(fixed.size as u64))
}
- AvroSchema::Bytes => Type::Primitive(PrimitiveType::Binary),
- AvroSchema::Null => return Ok(None),
- _ => {
- return Err(Error::new(
- ErrorKind::Unexpected,
- "Unable to convert avro {schema} to iceberg primitive
type.",
- ))
- }
- };
-
- Ok(Some(typ))
- }
- }
-
- /// Visit avro schema in post order visitor.
- pub(crate) fn visit<V: AvroSchemaVisitor>(
- schema: &AvroSchema,
- visitor: &mut V,
- ) -> Result<V::T> {
- match schema {
- AvroSchema::Record(record) => {
- let field_results = record
- .fields
- .iter()
- .map(|f| visit(&f.schema, visitor))
- .collect::<Result<Vec<V::T>>>()?;
-
- visitor.record(record, field_results)
- }
- AvroSchema::Union(union) => {
- let option_results = union
- .variants()
- .iter()
- .map(|f| visit(f, visitor))
- .collect::<Result<Vec<V::T>>>()?;
-
- visitor.union(union, option_results)
}
- AvroSchema::Array(item) => {
- let item_result = visit(item, visitor)?;
- visitor.array(schema, item_result)
- }
- AvroSchema::Map(inner) => {
- let item_result = visit(inner, visitor)?;
- visitor.map(schema, item_result)
- }
- schema => visitor.primitive(schema),
- }
- }
- /// Converts avro schema to iceberg schema.
- pub(crate) fn avro_schema_to_schema(avro_schema: &AvroSchema) ->
Result<Schema> {
- if let AvroSchema::Record(_) = avro_schema {
- let mut converter = AvroSchemaToSchema { next_id: 0 };
- let typ =
- visit(avro_schema, &mut converter)?.expect("Iceberg schema
should not be none.");
- if let Type::Struct(s) = typ {
- Schema::builder()
- .with_fields(s.fields().iter().cloned())
- .build()
- } else {
- Err(Error::new(
+ AvroSchema::Bytes => Type::Primitive(PrimitiveType::Binary),
+ AvroSchema::Null => return Ok(None),
+ _ => {
+ return Err(Error::new(
ErrorKind::Unexpected,
- format!("Expected to convert avro record schema to struct
type, but {typ}"),
+ "Unable to convert avro {schema} to iceberg primitive
type.",
))
}
+ };
+
+ Ok(Some(typ))
+ }
+
+ fn map_array(
+ &mut self,
+ array: &RecordSchema,
+ key: Option<Type>,
+ value: Option<Type>,
+ ) -> Result<Self::T> {
+ let key = key.ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Can't convert avro map schema, missing key schema.",
+ )
+ })?;
+ let value = value.ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Can't convert avro map schema, missing value schema.",
+ )
+ })?;
+ let key_id = Self::get_element_id_from_attributes(
+ &array.fields[0].custom_attributes,
+ FILED_ID_PROP,
+ )?;
+ let value_id = Self::get_element_id_from_attributes(
+ &array.fields[1].custom_attributes,
+ FILED_ID_PROP,
+ )?;
+ let key_field = NestedField::map_key_element(key_id, key);
+ let value_field = NestedField::map_value_element(
+ value_id,
+ value,
+ !is_avro_optional(&array.fields[1].schema),
+ );
+ Ok(Some(Type::Map(MapType {
+ key_field: key_field.into(),
+ value_field: value_field.into(),
+ })))
+ }
+}
+
+// # TODO
+// Fix this when we have used `avro_schema_to_schema` inner.
+#[allow(unused)]
+/// Converts avro schema to iceberg schema.
+pub(crate) fn avro_schema_to_schema(avro_schema: &AvroSchema) ->
Result<Schema> {
+ if let AvroSchema::Record(_) = avro_schema {
+ let mut converter = AvroSchemaToSchema;
+ let typ = visit(avro_schema, &mut converter)?.expect("Iceberg schema
should not be none.");
+ if let Type::Struct(s) = typ {
+ Schema::builder()
+ .with_fields(s.fields().iter().cloned())
+ .build()
} else {
Err(Error::new(
- ErrorKind::DataInvalid,
- "Can't convert non record avro schema to iceberg schema:
{avro_schema}",
+ ErrorKind::Unexpected,
+ format!("Expected to convert avro record schema to struct
type, but {typ}"),
))
}
+ } else {
+ Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Can't convert non record avro schema to iceberg schema:
{avro_schema}",
+ ))
}
+}
+
+#[cfg(test)]
+mod tests {
+ use std::fs::read_to_string;
+ use std::sync::Arc;
+
+ use apache_avro::schema::{Namespace, UnionSchema};
+ use apache_avro::Schema as AvroSchema;
+
+ use super::*;
+ use crate::avro::schema::AvroSchemaToSchema;
+ use crate::spec::{ListType, MapType, NestedField, PrimitiveType, Schema,
StructType, Type};
fn read_test_data_file_to_avro_schema(filename: &str) -> AvroSchema {
let input = read_to_string(format!(
@@ -557,22 +649,27 @@ mod tests {
AvroSchema::parse_str(input.as_str()).unwrap()
}
- fn check_schema_conversion(
- avro_schema: AvroSchema,
- expected_iceberg_schema: Schema,
- check_avro_to_iceberg: bool,
- ) {
- if check_avro_to_iceberg {
- let converted_iceberg_schema =
avro_schema_to_schema(&avro_schema).unwrap();
- assert_eq!(expected_iceberg_schema, converted_iceberg_schema);
- }
+ /// Help function to check schema conversion between avro and iceberg:
+ /// 1. avro to iceberg
+ /// 2. iceberg to avro
+ /// 3. iceberg to avro to iceberg back
+ fn check_schema_conversion(avro_schema: AvroSchema, iceberg_schema:
Schema) {
+ // 1. avro to iceberg
+ let converted_iceberg_schema =
avro_schema_to_schema(&avro_schema).unwrap();
+ assert_eq!(iceberg_schema, converted_iceberg_schema);
+ // 2. iceberg to avro
let converted_avro_schema = schema_to_avro_schema(
avro_schema.name().unwrap().fullname(Namespace::None),
- &expected_iceberg_schema,
+ &iceberg_schema,
)
.unwrap();
assert_eq!(avro_schema, converted_avro_schema);
+
+ // 3.iceberg to avro to iceberg back
+ let converted_avro_converted_iceberg_schema =
+ avro_schema_to_schema(&converted_avro_schema).unwrap();
+ assert_eq!(iceberg_schema, converted_avro_converted_iceberg_schema);
}
#[test]
@@ -651,7 +748,6 @@ mod tests {
check_schema_conversion(
read_test_data_file_to_avro_schema("avro_schema_manifest_file_v1.json"),
iceberg_schema,
- false,
);
}
@@ -700,7 +796,7 @@ mod tests {
.unwrap()
};
- check_schema_conversion(avro_schema, iceberg_schema, false);
+ check_schema_conversion(avro_schema, iceberg_schema);
}
#[test]
@@ -749,7 +845,7 @@ mod tests {
.unwrap()
};
- check_schema_conversion(avro_schema, iceberg_schema, false);
+ check_schema_conversion(avro_schema, iceberg_schema);
}
#[test]
@@ -826,7 +922,144 @@ mod tests {
.unwrap()
};
- check_schema_conversion(avro_schema, iceberg_schema, false);
+ check_schema_conversion(avro_schema, iceberg_schema);
+ }
+
+ #[test]
+ fn test_schema_with_array_map() {
+ let avro_schema = {
+ AvroSchema::parse_str(
+ r#"
+{
+ "type": "record",
+ "name": "avro_schema",
+ "fields": [
+ {
+ "name": "optional",
+ "type": {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "k102_v103",
+ "fields": [
+ {
+ "name": "key",
+ "type": "boolean",
+ "field-id": 102
+ },
+ {
+ "name": "value",
+ "type": ["null", "boolean"],
+ "field-id": 103
+ }
+ ]
+ },
+ "default": [],
+ "element-id": 101,
+ "logicalType": "map"
+ },
+ "field-id": 100
+ },{
+ "name": "required",
+ "type": {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "k105_v106",
+ "fields": [
+ {
+ "name": "key",
+ "type": "boolean",
+ "field-id": 105
+ },
+ {
+ "name": "value",
+ "type": "boolean",
+ "field-id": 106
+ }
+ ]
+ },
+ "default": [],
+ "logicalType": "map"
+ },
+ "field-id": 104
+ }, {
+ "name": "string_map",
+ "type": {
+ "type": "map",
+ "values": ["null", "long"],
+ "key-id": 108,
+ "value-id": 109
+ },
+ "field-id": 107
+ }
+ ]
+}
+"#,
+ )
+ .unwrap()
+ };
+
+ let iceberg_schema = {
+ Schema::builder()
+ .with_fields(vec![
+ Arc::new(NestedField::required(
+ 100,
+ "optional",
+ Type::Map(MapType {
+ key_field: NestedField::map_key_element(
+ 102,
+ PrimitiveType::Boolean.into(),
+ )
+ .into(),
+ value_field: NestedField::map_value_element(
+ 103,
+ PrimitiveType::Boolean.into(),
+ false,
+ )
+ .into(),
+ }),
+ )),
+ Arc::new(NestedField::required(
+ 104,
+ "required",
+ Type::Map(MapType {
+ key_field: NestedField::map_key_element(
+ 105,
+ PrimitiveType::Boolean.into(),
+ )
+ .into(),
+ value_field: NestedField::map_value_element(
+ 106,
+ PrimitiveType::Boolean.into(),
+ true,
+ )
+ .into(),
+ }),
+ )),
+ Arc::new(NestedField::required(
+ 107,
+ "string_map",
+ Type::Map(MapType {
+ key_field: NestedField::map_key_element(
+ 108,
+ PrimitiveType::String.into(),
+ )
+ .into(),
+ value_field: NestedField::map_value_element(
+ 109,
+ PrimitiveType::Long.into(),
+ false,
+ )
+ .into(),
+ }),
+ )),
+ ])
+ .build()
+ .unwrap()
+ };
+
+ check_schema_conversion(avro_schema, iceberg_schema);
}
#[test]
@@ -838,7 +1071,7 @@ mod tests {
])
.unwrap();
- let mut converter = AvroSchemaToSchema { next_id: 0 };
+ let mut converter = AvroSchemaToSchema;
let options = avro_schema
.variants()
@@ -850,7 +1083,7 @@ mod tests {
#[test]
fn test_string_type() {
- let mut converter = AvroSchemaToSchema { next_id: 0 };
+ let mut converter = AvroSchemaToSchema;
let avro_schema = AvroSchema::String;
assert_eq!(
@@ -875,10 +1108,14 @@ mod tests {
.unwrap()
};
- let mut converter = AvroSchemaToSchema { next_id: 0 };
+ let AvroSchema::Map(avro_schema) = avro_schema else {
+ unreachable!()
+ };
+
+ let mut converter = AvroSchemaToSchema;
let iceberg_type = Type::Map(MapType {
- key_field: NestedField::map_key_element(1,
PrimitiveType::String.into()).into(),
- value_field: NestedField::map_value_element(2,
PrimitiveType::Long.into(), false)
+ key_field: NestedField::map_key_element(101,
PrimitiveType::String.into()).into(),
+ value_field: NestedField::map_value_element(102,
PrimitiveType::Long.into(), false)
.into(),
});
@@ -902,7 +1139,7 @@ mod tests {
.unwrap()
};
- let mut converter = AvroSchemaToSchema { next_id: 0 };
+ let mut converter = AvroSchemaToSchema;
let iceberg_type = Type::from(PrimitiveType::Fixed(22));
@@ -914,7 +1151,7 @@ mod tests {
#[test]
fn test_unknown_primitive() {
- let mut converter = AvroSchemaToSchema { next_id: 0 };
+ let mut converter = AvroSchemaToSchema;
assert!(converter.primitive(&AvroSchema::Duration).is_err());
}
@@ -953,7 +1190,7 @@ mod tests {
.unwrap()
};
- let mut converter = AvroSchemaToSchema { next_id: 0 };
+ let mut converter = AvroSchemaToSchema;
assert_eq!(
Type::decimal(25, 19).unwrap(),
@@ -963,7 +1200,7 @@ mod tests {
#[test]
fn test_date_type() {
- let mut converter = AvroSchemaToSchema { next_id: 0 };
+ let mut converter = AvroSchemaToSchema;
assert_eq!(
Type::from(PrimitiveType::Date),
diff --git a/crates/iceberg/src/spec/datatypes.rs
b/crates/iceberg/src/spec/datatypes.rs
index 06dc955..d888387 100644
--- a/crates/iceberg/src/spec/datatypes.rs
+++ b/crates/iceberg/src/spec/datatypes.rs
@@ -581,6 +581,19 @@ impl From<NestedField> for SerdeNestedField {
pub type NestedFieldRef = Arc<NestedField>;
impl NestedField {
+ /// Construct a new field.
+ pub fn new(id: i32, name: impl ToString, field_type: Type, required: bool)
-> Self {
+ Self {
+ id,
+ name: name.to_string(),
+ required,
+ field_type: Box::new(field_type),
+ doc: None,
+ initial_default: None,
+ write_default: None,
+ }
+ }
+
/// Construct a required field.
pub fn required(id: i32, name: impl ToString, field_type: Type) -> Self {
Self {