This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 10cc7c9 feat: Introduce conversion between iceberg schema and avro
schema (#40)
10cc7c9 is described below
commit 10cc7c9ac66777ce91464d3454c2bfb68dbead1a
Author: Renjie Liu <[email protected]>
AuthorDate: Thu Aug 31 19:28:43 2023 +0800
feat: Introduce conversion between iceberg schema and avro schema (#40)
* feat: Introduce conversion between iceberg schema and avro schema
* Fix typo
* Fix comments
* Fix clippy
* Calculate decimal bytes
---
crates/iceberg/Cargo.toml | 4 +-
crates/iceberg/src/{lib.rs => avro/mod.rs} | 16 +-
crates/iceberg/src/avro/schema.rs | 913 +++++++++++++++++++++
crates/iceberg/src/error.rs | 6 +
crates/iceberg/src/lib.rs | 1 +
crates/iceberg/src/spec/datatypes.rs | 132 ++-
.../testdata/avro_schema_manifest_entry.json | 286 +++++++
.../testdata/avro_schema_manifest_file_v1.json | 139 ++++
.../testdata/avro_schema_manifest_file_v2.json | 141 ++++
9 files changed, 1614 insertions(+), 24 deletions(-)
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 053acf8..665ccad 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -27,7 +27,7 @@ license = "Apache-2.0"
keywords = ["iceberg"]
[dependencies]
-apache-avro = "0.15.0"
+apache-avro = "0.15"
serde = {version = "^1.0", features = ["rc"]}
serde_bytes = "0.11.8"
serde_json = "^1.0"
@@ -43,6 +43,8 @@ serde_repr = "0.1.16"
itertools = "0.11"
bimap = "0.6"
derive_builder = "0.12.0"
+either = "1"
+lazy_static = "1"
[dev-dependencies]
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/avro/mod.rs
similarity index 78%
copy from crates/iceberg/src/lib.rs
copy to crates/iceberg/src/avro/mod.rs
index 8e77b97..a6bb073 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/avro/mod.rs
@@ -15,16 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-//! Native Rust implementation of Apache Iceberg
-
-#![deny(missing_docs)]
-
-#[macro_use]
-extern crate derive_builder;
-
-mod error;
-pub use error::Error;
-pub use error::ErrorKind;
-pub use error::Result;
-
-pub mod spec;
+//! Avro related codes.
+#[allow(dead_code)]
+mod schema;
diff --git a/crates/iceberg/src/avro/schema.rs
b/crates/iceberg/src/avro/schema.rs
new file mode 100644
index 0000000..244d208
--- /dev/null
+++ b/crates/iceberg/src/avro/schema.rs
@@ -0,0 +1,913 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Conversion between iceberg and avro schema.
+use crate::spec::{
+ visit_schema, ListType, MapType, NestedField, NestedFieldRef,
PrimitiveType, Schema,
+ SchemaVisitor, StructType, Type,
+};
+use crate::{ensure_data_valid, Error, ErrorKind, Result};
+use apache_avro::schema::{
+ DecimalSchema, FixedSchema, Name, RecordField as AvroRecordField,
RecordFieldOrder,
+ RecordSchema, UnionSchema,
+};
+use apache_avro::Schema as AvroSchema;
+use itertools::{Either, Itertools};
+use serde_json::{Number, Value};
+
+const FILED_ID_PROP: &str = "field-id";
+
+struct SchemaToAvroSchema {
+ schema: String,
+}
+
+type AvroSchemaOrField = Either<AvroSchema, AvroRecordField>;
+
+impl SchemaVisitor for SchemaToAvroSchema {
+ type T = AvroSchemaOrField;
+
+ fn schema(&mut self, _schema: &Schema, value: AvroSchemaOrField) ->
Result<AvroSchemaOrField> {
+ let mut avro_schema = value.unwrap_left();
+
+ if let AvroSchema::Record(record) = &mut avro_schema {
+ record.name = Name::from(self.schema.as_str());
+ } else {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "Schema result must be avro record!",
+ ));
+ }
+
+ Ok(Either::Left(avro_schema))
+ }
+
+ fn field(
+ &mut self,
+ field: &NestedFieldRef,
+ avro_schema: AvroSchemaOrField,
+ ) -> Result<AvroSchemaOrField> {
+ let mut field_schema = avro_schema.unwrap_left();
+ if let AvroSchema::Record(record) = &mut field_schema {
+ record.name = Name::from(format!("r{}", field.id).as_str());
+ }
+
+ if !field.required {
+ field_schema = avro_optional(field_schema)?;
+ }
+
+ let mut avro_record_field = AvroRecordField {
+ name: field.name.clone(),
+ schema: field_schema,
+ order: RecordFieldOrder::Ignore,
+ position: 0,
+ doc: field.doc.clone(),
+ aliases: None,
+ default: None,
+ custom_attributes: Default::default(),
+ };
+
+ if !field.required {
+ avro_record_field.default = Some(Value::Null);
+ }
+ avro_record_field.custom_attributes.insert(
+ FILED_ID_PROP.to_string(),
+ Value::Number(Number::from(field.id)),
+ );
+
+ Ok(Either::Right(avro_record_field))
+ }
+
+ fn r#struct(
+ &mut self,
+ _struct: &StructType,
+ results: Vec<AvroSchemaOrField>,
+ ) -> Result<AvroSchemaOrField> {
+ let avro_fields = results.into_iter().map(|r|
r.unwrap_right()).collect();
+
+ Ok(Either::Left(AvroSchema::Record(RecordSchema {
+ // The name of this record schema should be determined later, by
schema name or field
+ // name, here we use a temporary placeholder to do it.
+ name: Name::new("null")?,
+ aliases: None,
+ doc: None,
+ fields: avro_fields,
+ lookup: Default::default(),
+ attributes: Default::default(),
+ })))
+ }
+
+ fn list(&mut self, list: &ListType, value: AvroSchemaOrField) ->
Result<AvroSchemaOrField> {
+ let mut field_schema = value.unwrap_left();
+
+ if let AvroSchema::Record(record) = &mut field_schema {
+ record.name = Name::from(format!("r{}",
list.element_field.id).as_str());
+ }
+
+ if !list.element_field.required {
+ field_schema = avro_optional(field_schema)?;
+ }
+
+ // TODO: We need to add element id prop here, but rust's avro schema
doesn't support property except record schema.
+ Ok(Either::Left(AvroSchema::Array(Box::new(field_schema))))
+ }
+
+ fn map(
+ &mut self,
+ map: &MapType,
+ key_value: AvroSchemaOrField,
+ value: AvroSchemaOrField,
+ ) -> Result<AvroSchemaOrField> {
+ let key_field_schema = key_value.unwrap_left();
+ let mut value_field_schema = value.unwrap_left();
+ if !map.value_field.required {
+ value_field_schema = avro_optional(value_field_schema)?;
+ }
+
+ if matches!(key_field_schema, AvroSchema::String) {
+ Ok(Either::Left(AvroSchema::Map(Box::new(value_field_schema))))
+ } else {
+ // Avro map requires that key must be string type. Here we convert
it to array if key is
+ // not string type.
+ let key_field = {
+ let mut field = AvroRecordField {
+ name: map.key_field.name.clone(),
+ doc: None,
+ aliases: None,
+ default: None,
+ schema: key_field_schema,
+ order: RecordFieldOrder::Ascending,
+ position: 0,
+ custom_attributes: Default::default(),
+ };
+ field.custom_attributes.insert(
+ FILED_ID_PROP.to_string(),
+ Value::Number(Number::from(map.key_field.id)),
+ );
+ field
+ };
+
+ let value_field = {
+ let mut field = AvroRecordField {
+ name: map.key_field.name.clone(),
+ doc: None,
+ aliases: None,
+ default: None,
+ schema: value_field_schema,
+ order: RecordFieldOrder::Ignore,
+ position: 0,
+ custom_attributes: Default::default(),
+ };
+ field.custom_attributes.insert(
+ FILED_ID_PROP.to_string(),
+ Value::Number(Number::from(map.value_field.id)),
+ );
+ field
+ };
+
+ let item_avro_schema = AvroSchema::Record(RecordSchema {
+ name: Name::from(format!("k{}_v{}", map.key_field.id,
map.value_field.id).as_str()),
+ aliases: None,
+ doc: None,
+ fields: vec![key_field, value_field],
+ lookup: Default::default(),
+ attributes: Default::default(),
+ });
+
+ Ok(Either::Left(item_avro_schema))
+ }
+ }
+
+ fn primitive(&mut self, p: &PrimitiveType) -> Result<AvroSchemaOrField> {
+ let avro_schema = match p {
+ PrimitiveType::Boolean => AvroSchema::Boolean,
+ PrimitiveType::Int => AvroSchema::Int,
+ PrimitiveType::Long => AvroSchema::Long,
+ PrimitiveType::Float => AvroSchema::Float,
+ PrimitiveType::Double => AvroSchema::Double,
+ PrimitiveType::Date => AvroSchema::Date,
+ PrimitiveType::Time => AvroSchema::TimeMicros,
+ PrimitiveType::Timestamp => AvroSchema::TimestampMicros,
+ PrimitiveType::Timestamptz => AvroSchema::TimestampMicros,
+ PrimitiveType::String => AvroSchema::String,
+ PrimitiveType::Uuid => AvroSchema::Uuid,
+ PrimitiveType::Fixed(len) => avro_fixed_schema((*len) as usize)?,
+ PrimitiveType::Binary => AvroSchema::Bytes,
+ PrimitiveType::Decimal { precision, scale } => {
+ avro_decimal_schema(*precision as usize, *scale as usize)?
+ }
+ };
+ Ok(Either::Left(avro_schema))
+ }
+}
+
+/// Converting iceberg schema to avro schema.
+pub(crate) fn schema_to_avro_schema(name: impl ToString, schema: &Schema) ->
Result<AvroSchema> {
+ let mut converter = SchemaToAvroSchema {
+ schema: name.to_string(),
+ };
+
+ visit_schema(schema, &mut converter).map(Either::unwrap_left)
+}
+
+pub(crate) fn avro_fixed_schema(len: usize) -> Result<AvroSchema> {
+ Ok(AvroSchema::Fixed(FixedSchema {
+ name: Name::new(format!("fixed_{len}").as_str())?,
+ aliases: None,
+ doc: None,
+ size: len,
+ attributes: Default::default(),
+ }))
+}
+
+pub(crate) fn avro_decimal_schema(precision: usize, scale: usize) ->
Result<AvroSchema> {
+ Ok(AvroSchema::Decimal(DecimalSchema {
+ precision,
+ scale,
+ inner: Box::new(avro_fixed_schema(
+ Type::decimal_required_bytes(precision as u32)? as usize,
+ )?),
+ }))
+}
+
+fn avro_optional(avro_schema: AvroSchema) -> Result<AvroSchema> {
+ Ok(AvroSchema::Union(UnionSchema::new(vec![
+ AvroSchema::Null,
+ avro_schema,
+ ])?))
+}
+
+fn is_avro_optional(avro_schema: &AvroSchema) -> bool {
+ match avro_schema {
+ AvroSchema::Union(union) => union.is_nullable(),
+ _ => false,
+ }
+}
+
+/// Post order avro schema visitor.
+pub(crate) trait AvroSchemaVisitor {
+ type T;
+
+ fn record(&mut self, record: &RecordSchema, fields: Vec<Self::T>) ->
Result<Self::T>;
+
+ fn union(&mut self, union: &UnionSchema, options: Vec<Self::T>) ->
Result<Self::T>;
+
+ fn array(&mut self, array: &AvroSchema, item: Self::T) -> Result<Self::T>;
+ fn map(&mut self, map: &AvroSchema, value: Self::T) -> Result<Self::T>;
+
+ fn primitive(&mut self, schema: &AvroSchema) -> Result<Self::T>;
+}
+
+/// Visit avro schema in post order visitor.
+pub(crate) fn visit<V: AvroSchemaVisitor>(schema: &AvroSchema, visitor: &mut
V) -> Result<V::T> {
+ match schema {
+ AvroSchema::Record(record) => {
+ let field_results = record
+ .fields
+ .iter()
+ .map(|f| visit(&f.schema, visitor))
+ .collect::<Result<Vec<V::T>>>()?;
+
+ visitor.record(record, field_results)
+ }
+ AvroSchema::Union(union) => {
+ let option_results = union
+ .variants()
+ .iter()
+ .map(|f| visit(f, visitor))
+ .collect::<Result<Vec<V::T>>>()?;
+
+ visitor.union(union, option_results)
+ }
+ AvroSchema::Array(item) => {
+ let item_result = visit(item, visitor)?;
+ visitor.array(schema, item_result)
+ }
+ AvroSchema::Map(inner) => {
+ let item_result = visit(inner, visitor)?;
+ visitor.map(schema, item_result)
+ }
+ schema => visitor.primitive(schema),
+ }
+}
+
+struct AvroSchemaToSchema {
+ next_id: i32,
+}
+
+impl AvroSchemaToSchema {
+ fn next_field_id(&mut self) -> i32 {
+ self.next_id += 1;
+ self.next_id
+ }
+}
+
+impl AvroSchemaVisitor for AvroSchemaToSchema {
+ // Only `AvroSchema::Null` will return `None`
+ type T = Option<Type>;
+
+ fn record(
+ &mut self,
+ record: &RecordSchema,
+ field_types: Vec<Option<Type>>,
+ ) -> Result<Option<Type>> {
+ let mut fields = Vec::with_capacity(field_types.len());
+ for (avro_field, typ) in record.fields.iter().zip_eq(field_types) {
+ let field_id = avro_field
+ .custom_attributes
+ .get(FILED_ID_PROP)
+ .and_then(Value::as_i64)
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Can't convert field, missing field id:
{avro_field:?}"),
+ )
+ })?;
+
+ let optional = is_avro_optional(&avro_field.schema);
+
+ let mut field = if optional {
+ NestedField::optional(field_id as i32, &avro_field.name,
typ.unwrap())
+ } else {
+ NestedField::required(field_id as i32, &avro_field.name,
typ.unwrap())
+ };
+
+ if let Some(doc) = &avro_field.doc {
+ field = field.with_doc(doc);
+ }
+
+ fields.push(field.into());
+ }
+
+ Ok(Some(Type::Struct(StructType::new(fields))))
+ }
+
+ fn union(
+ &mut self,
+ union: &UnionSchema,
+ mut options: Vec<Option<Type>>,
+ ) -> Result<Option<Type>> {
+ ensure_data_valid!(
+ options.len() <= 2 && !options.is_empty(),
+ "Can't convert avro union type {:?} to iceberg.",
+ union
+ );
+
+ if options.len() > 1 {
+ ensure_data_valid!(
+ options[0].is_none(),
+ "Can't convert avro union type {:?} to iceberg.",
+ union
+ );
+ }
+
+ if options.len() == 1 {
+ Ok(Some(options.remove(0).unwrap()))
+ } else {
+ Ok(Some(options.remove(1).unwrap()))
+ }
+ }
+
+ fn array(&mut self, array: &AvroSchema, item: Option<Type>) ->
Result<Self::T> {
+ if let AvroSchema::Array(item_schema) = array {
+ let element_field = NestedField::list_element(
+ self.next_field_id(),
+ item.unwrap(),
+ !is_avro_optional(item_schema),
+ )
+ .into();
+ Ok(Some(Type::List(ListType { element_field })))
+ } else {
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ "Expected avro array schema, but {array}",
+ ))
+ }
+ }
+
+ fn map(&mut self, map: &AvroSchema, value: Option<Type>) ->
Result<Option<Type>> {
+ if let AvroSchema::Map(value_schema) = map {
+ // Due to avro rust implementation's limitation, we can't store
attributes in map schema,
+ // we will fix it later when it has been resolved.
+ let key_field = NestedField::map_key_element(
+ self.next_field_id(),
+ Type::Primitive(PrimitiveType::String),
+ );
+ let value_field = NestedField::map_value_element(
+ self.next_field_id(),
+ value.unwrap(),
+ !is_avro_optional(value_schema),
+ );
+ Ok(Some(Type::Map(MapType {
+ key_field: key_field.into(),
+ value_field: value_field.into(),
+ })))
+ } else {
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ "Expected avro map schema, but {map}",
+ ))
+ }
+ }
+
+ fn primitive(&mut self, schema: &AvroSchema) -> Result<Option<Type>> {
+ let typ = match schema {
+ AvroSchema::Decimal(decimal) => {
+ Type::decimal(decimal.precision as u32, decimal.scale as u32)?
+ }
+ AvroSchema::Date => Type::Primitive(PrimitiveType::Date),
+ AvroSchema::TimeMicros => Type::Primitive(PrimitiveType::Time),
+ AvroSchema::TimestampMicros =>
Type::Primitive(PrimitiveType::Timestamp),
+ AvroSchema::Uuid => Type::Primitive(PrimitiveType::Uuid),
+ AvroSchema::Boolean => Type::Primitive(PrimitiveType::Boolean),
+ AvroSchema::Int => Type::Primitive(PrimitiveType::Int),
+ AvroSchema::Long => Type::Primitive(PrimitiveType::Long),
+ AvroSchema::Float => Type::Primitive(PrimitiveType::Float),
+ AvroSchema::Double => Type::Primitive(PrimitiveType::Double),
+ AvroSchema::String | AvroSchema::Enum(_) =>
Type::Primitive(PrimitiveType::String),
+ AvroSchema::Fixed(fixed) =>
Type::Primitive(PrimitiveType::Fixed(fixed.size as u64)),
+ AvroSchema::Bytes => Type::Primitive(PrimitiveType::Binary),
+ AvroSchema::Null => return Ok(None),
+ _ => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "Unable to convert avro {schema} to iceberg primitive
type.",
+ ))
+ }
+ };
+
+ Ok(Some(typ))
+ }
+}
+
+/// Converts avro schema to iceberg schema.
+pub(crate) fn avro_schema_to_schema(avro_schema: &AvroSchema) ->
Result<Schema> {
+ if let AvroSchema::Record(_) = avro_schema {
+ let mut converter = AvroSchemaToSchema { next_id: 0 };
+ let typ = visit(avro_schema, &mut converter)?.expect("Iceberg schema
should not be none.");
+ if let Type::Struct(s) = typ {
+ Schema::builder()
+ .with_fields(s.fields().iter().cloned())
+ .build()
+ } else {
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("Expected to convert avro record schema to struct
type, but {typ}"),
+ ))
+ }
+ } else {
+ Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Can't convert non record avro schema to iceberg schema:
{avro_schema}",
+ ))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::avro::schema::AvroSchemaToSchema;
+ use crate::spec::{ListType, MapType, NestedField, PrimitiveType, Schema,
StructType, Type};
+ use apache_avro::schema::{Namespace, UnionSchema};
+ use apache_avro::Schema as AvroSchema;
+ use std::fs::read_to_string;
+
+ fn read_test_data_file_to_avro_schema(filename: &str) -> AvroSchema {
+ let input = read_to_string(format!(
+ "{}/testdata/{}",
+ env!("CARGO_MANIFEST_DIR"),
+ filename
+ ))
+ .unwrap();
+
+ println!("Input is {input}");
+ AvroSchema::parse_str(input.as_str()).unwrap()
+ }
+
+ fn check_schema_conversion(
+ avro_schema: AvroSchema,
+ expected_iceberg_schema: Schema,
+ check_avro_to_iceberg: bool,
+ ) {
+ if check_avro_to_iceberg {
+ let converted_iceberg_schema =
avro_schema_to_schema(&avro_schema).unwrap();
+ assert_eq!(expected_iceberg_schema, converted_iceberg_schema);
+ }
+
+ let converted_avro_schema = schema_to_avro_schema(
+ avro_schema.name().unwrap().fullname(Namespace::None),
+ &expected_iceberg_schema,
+ )
+ .unwrap();
+ assert_eq!(avro_schema, converted_avro_schema);
+ }
+
+ #[test]
+ fn test_manifest_file_v1_schema() {
+ let fields = vec![
+ NestedField::required(500, "manifest_path",
PrimitiveType::String.into())
+ .with_doc("Location URI with FS scheme")
+ .into(),
+ NestedField::required(501, "manifest_length",
PrimitiveType::Long.into())
+ .with_doc("Total file size in bytes")
+ .into(),
+ NestedField::required(502, "partition_spec_id",
PrimitiveType::Int.into())
+ .with_doc("Spec ID used to write")
+ .into(),
+ NestedField::optional(503, "added_snapshot_id",
PrimitiveType::Long.into())
+ .with_doc("Snapshot ID that added the manifest")
+ .into(),
+ NestedField::optional(504, "added_data_files_count",
PrimitiveType::Int.into())
+ .with_doc("Added entry count")
+ .into(),
+ NestedField::optional(505, "existing_data_files_count",
PrimitiveType::Int.into())
+ .with_doc("Existing entry count")
+ .into(),
+ NestedField::optional(506, "deleted_data_files_count",
PrimitiveType::Int.into())
+ .with_doc("Deleted entry count")
+ .into(),
+ NestedField::optional(
+ 507,
+ "partitions",
+ ListType {
+ element_field: NestedField::list_element(
+ 508,
+ StructType::new(vec![
+ NestedField::required(
+ 509,
+ "contains_null",
+ PrimitiveType::Boolean.into(),
+ )
+ .with_doc("True if any file has a null partition
value")
+ .into(),
+ NestedField::optional(
+ 518,
+ "contains_nan",
+ PrimitiveType::Boolean.into(),
+ )
+ .with_doc("True if any file has a nan partition
value")
+ .into(),
+ NestedField::optional(510, "lower_bound",
PrimitiveType::Binary.into())
+ .with_doc("Partition lower bound for all
files")
+ .into(),
+ NestedField::optional(511, "upper_bound",
PrimitiveType::Binary.into())
+ .with_doc("Partition upper bound for all
files")
+ .into(),
+ ])
+ .into(),
+ true,
+ )
+ .into(),
+ }
+ .into(),
+ )
+ .with_doc("Summary for each partition")
+ .into(),
+ NestedField::optional(512, "added_rows_count",
PrimitiveType::Long.into())
+ .with_doc("Added rows count")
+ .into(),
+ NestedField::optional(513, "existing_rows_count",
PrimitiveType::Long.into())
+ .with_doc("Existing rows count")
+ .into(),
+ NestedField::optional(514, "deleted_rows_count",
PrimitiveType::Long.into())
+ .with_doc("Deleted rows count")
+ .into(),
+ ];
+
+ let iceberg_schema =
Schema::builder().with_fields(fields).build().unwrap();
+ check_schema_conversion(
+
read_test_data_file_to_avro_schema("avro_schema_manifest_file_v1.json"),
+ iceberg_schema,
+ false,
+ );
+ }
+
+ #[test]
+ fn test_avro_list_required_primitive() {
+ let avro_schema = {
+ AvroSchema::parse_str(
+ r#"
+{
+ "type": "record",
+ "name": "avro_schema",
+ "fields": [
+ {
+ "name": "array_with_string",
+ "type": {
+ "type": "array",
+ "items": "string",
+ "default": [],
+ "element-id": 101
+ },
+ "field-id": 100
+ }
+ ]
+}"#,
+ )
+ .unwrap()
+ };
+
+ let iceberg_schema = {
+ Schema::builder()
+ .with_fields(vec![NestedField::required(
+ 100,
+ "array_with_string",
+ ListType {
+ element_field: NestedField::list_element(
+ 101,
+ PrimitiveType::String.into(),
+ true,
+ )
+ .into(),
+ }
+ .into(),
+ )
+ .into()])
+ .build()
+ .unwrap()
+ };
+
+ check_schema_conversion(avro_schema, iceberg_schema, false);
+ }
+
+ #[test]
+ fn test_avro_list_wrapped_primitive() {
+ let avro_schema = {
+ AvroSchema::parse_str(
+ r#"
+{
+ "type": "record",
+ "name": "avro_schema",
+ "fields": [
+ {
+ "name": "array_with_string",
+ "type": {
+ "type": "array",
+ "items": {"type": "string"},
+ "default": [],
+ "element-id": 101
+ },
+ "field-id": 100
+ }
+ ]
+}
+"#,
+ )
+ .unwrap()
+ };
+
+ let iceberg_schema = {
+ Schema::builder()
+ .with_fields(vec![NestedField::required(
+ 100,
+ "array_with_string",
+ ListType {
+ element_field: NestedField::list_element(
+ 101,
+ PrimitiveType::String.into(),
+ true,
+ )
+ .into(),
+ }
+ .into(),
+ )
+ .into()])
+ .build()
+ .unwrap()
+ };
+
+ check_schema_conversion(avro_schema, iceberg_schema, false);
+ }
+
+ #[test]
+ fn test_avro_list_required_record() {
+ let avro_schema = {
+ AvroSchema::parse_str(
+ r#"
+{
+ "type": "record",
+ "name": "avro_schema",
+ "fields": [
+ {
+ "name": "array_with_record",
+ "type": {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "r101",
+ "fields": [
+ {
+ "name": "contains_null",
+ "type": "boolean",
+ "field-id": 102
+ },
+ {
+ "name": "contains_nan",
+ "type": ["null", "boolean"],
+ "field-id": 103
+ }
+ ]
+ },
+ "element-id": 101
+ },
+ "field-id": 100
+ }
+ ]
+}
+"#,
+ )
+ .unwrap()
+ };
+
+ let iceberg_schema = {
+ Schema::builder()
+ .with_fields(vec![NestedField::required(
+ 100,
+ "array_with_record",
+ ListType {
+ element_field: NestedField::list_element(
+ 101,
+ StructType::new(vec![
+ NestedField::required(
+ 102,
+ "contains_null",
+ PrimitiveType::Boolean.into(),
+ )
+ .into(),
+ NestedField::optional(
+ 103,
+ "contains_nan",
+ PrimitiveType::Boolean.into(),
+ )
+ .into(),
+ ])
+ .into(),
+ true,
+ )
+ .into(),
+ }
+ .into(),
+ )
+ .into()])
+ .build()
+ .unwrap()
+ };
+
+ check_schema_conversion(avro_schema, iceberg_schema, false);
+ }
+
+ #[test]
+ fn test_resolve_union() {
+ let avro_schema = UnionSchema::new(vec![
+ AvroSchema::Null,
+ AvroSchema::String,
+ AvroSchema::Boolean,
+ ])
+ .unwrap();
+
+ let mut converter = AvroSchemaToSchema { next_id: 0 };
+
+ let options = avro_schema
+ .variants()
+ .iter()
+ .map(|v| converter.primitive(v).unwrap())
+ .collect();
+ assert!(converter.union(&avro_schema, options).is_err());
+ }
+
+ #[test]
+ fn test_string_type() {
+ let mut converter = AvroSchemaToSchema { next_id: 0 };
+ let avro_schema = AvroSchema::String;
+
+ assert_eq!(
+ Some(PrimitiveType::String.into()),
+ converter.primitive(&avro_schema).unwrap()
+ );
+ }
+
+ #[test]
+ fn test_map_type() {
+ let avro_schema = {
+ AvroSchema::parse_str(
+ r#"
+{
+ "type": "map",
+ "values": ["null", "long"],
+ "key-id": 101,
+ "value-id": 102
+}
+"#,
+ )
+ .unwrap()
+ };
+
+ let mut converter = AvroSchemaToSchema { next_id: 0 };
+ let iceberg_type = Type::Map(MapType {
+ key_field: NestedField::map_key_element(1,
PrimitiveType::String.into()).into(),
+ value_field: NestedField::map_value_element(2,
PrimitiveType::Long.into(), false)
+ .into(),
+ });
+
+ assert_eq!(
+ iceberg_type,
+ converter
+ .map(&avro_schema, Some(PrimitiveType::Long.into()))
+ .unwrap()
+ .unwrap()
+ );
+ }
+
+ #[test]
+ fn test_fixed_type() {
+ let avro_schema = {
+ AvroSchema::parse_str(
+ r#"
+ {"name": "test", "type": "fixed", "size": 22}
+ "#,
+ )
+ .unwrap()
+ };
+
+ let mut converter = AvroSchemaToSchema { next_id: 0 };
+
+ let iceberg_type = Type::from(PrimitiveType::Fixed(22));
+
+ assert_eq!(
+ iceberg_type,
+ converter.primitive(&avro_schema).unwrap().unwrap()
+ );
+ }
+
+ #[test]
+ fn test_unknown_primitive() {
+ let mut converter = AvroSchemaToSchema { next_id: 0 };
+
+ assert!(converter.primitive(&AvroSchema::Duration).is_err());
+ }
+
+ #[test]
+ fn test_no_field_id() {
+ let avro_schema = {
+ AvroSchema::parse_str(
+ r#"
+{
+ "type": "record",
+ "name": "avro_schema",
+ "fields": [
+ {
+ "name": "array_with_string",
+ "type": "string"
+ }
+ ]
+}
+"#,
+ )
+ .unwrap()
+ };
+
+ assert!(avro_schema_to_schema(&avro_schema).is_err());
+ }
+
+ #[test]
+ fn test_decimal_type() {
+ let avro_schema = {
+ AvroSchema::parse_str(
+ r#"
+ {"type": "bytes", "logicalType": "decimal", "precision": 25, "scale": 19}
+ "#,
+ )
+ .unwrap()
+ };
+
+ let mut converter = AvroSchemaToSchema { next_id: 0 };
+
+ assert_eq!(
+ Type::decimal(25, 19).unwrap(),
+ converter.primitive(&avro_schema).unwrap().unwrap()
+ );
+ }
+
+ #[test]
+ fn test_date_type() {
+ let mut converter = AvroSchemaToSchema { next_id: 0 };
+
+ assert_eq!(
+ Type::from(PrimitiveType::Date),
+ converter.primitive(&AvroSchema::Date).unwrap().unwrap()
+ );
+ }
+}
diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs
index b2a4ba8..48bdebc 100644
--- a/crates/iceberg/src/error.rs
+++ b/crates/iceberg/src/error.rs
@@ -289,6 +289,12 @@ define_from_err!(
"Failed to convert between uuid und iceberg value"
);
+define_from_err!(
+ apache_avro::Error,
+ ErrorKind::DataInvalid,
+ "Failure in conversion with avro"
+);
+
/// Helper macro to check arguments.
///
///
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 8e77b97..5ef9ad3 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -27,4 +27,5 @@ pub use error::Error;
pub use error::ErrorKind;
pub use error::Result;
+mod avro;
pub mod spec;
diff --git a/crates/iceberg/src/spec/datatypes.rs
b/crates/iceberg/src/spec/datatypes.rs
index eef24e1..d5fc3ea 100644
--- a/crates/iceberg/src/spec/datatypes.rs
+++ b/crates/iceberg/src/spec/datatypes.rs
@@ -18,6 +18,9 @@
/*!
* Data Types
*/
+use crate::ensure_data_valid;
+use crate::error::Result;
+use crate::spec::datatypes::_decimal::{MAX_PRECISION, REQUIRED_LENGTH};
use ::serde::de::{MapAccess, Visitor};
use serde::de::{Error, IntoDeserializer};
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
@@ -34,6 +37,44 @@ pub(crate) const LIST_FILED_NAME: &str = "element";
pub(crate) const MAP_KEY_FIELD_NAME: &str = "key";
pub(crate) const MAP_VALUE_FIELD_NAME: &str = "value";
+pub(crate) const MAX_DECIMAL_BYTES: u32 = 24;
+pub(crate) const MAX_DECIMAL_PRECISION: u32 = 38;
+
+mod _decimal {
+ use lazy_static::lazy_static;
+
+ use crate::spec::{MAX_DECIMAL_BYTES, MAX_DECIMAL_PRECISION};
+
+ lazy_static! {
+ // Max precision of bytes, starts from 1
+ pub(super) static ref MAX_PRECISION: [u32; MAX_DECIMAL_BYTES as usize]
= {
+ let mut ret: [u32; 24] = [0; 24];
+ for (i, prec) in ret.iter_mut().enumerate() {
+ *prec = 2f64.powi((8 * (i + 1) - 1) as i32).log10().floor() as
u32;
+ }
+
+ ret
+ };
+
+ // Required bytes of precision, starts from 1
+ pub(super) static ref REQUIRED_LENGTH: [u32; MAX_DECIMAL_PRECISION as
usize] = {
+ let mut ret: [u32; MAX_DECIMAL_PRECISION as usize] = [0;
MAX_DECIMAL_PRECISION as usize];
+
+ for (i, required_len) in ret.iter_mut().enumerate() {
+ for j in 0..MAX_PRECISION.len() {
+ if MAX_PRECISION[j] >= ((i+1) as u32) {
+ *required_len = (j+1) as u32;
+ break;
+ }
+ }
+ }
+
+ ret
+ };
+
+ }
+}
+
#[derive(Debug, PartialEq, Eq, Clone)]
/// All data types are either primitives or nested types, which are maps,
lists, or structs.
pub enum Type {
@@ -70,6 +111,54 @@ impl Type {
pub fn is_struct(&self) -> bool {
matches!(self, Type::Struct(_))
}
+
+ /// Return max precision for decimal given [`num_bytes`] bytes.
+ #[inline(always)]
+ pub fn decimal_max_precision(num_bytes: u32) -> Result<u32> {
+ ensure_data_valid!(
+ num_bytes > 0 && num_bytes <= MAX_DECIMAL_BYTES,
+ "Decimal length larger than {MAX_DECIMAL_BYTES} is not supported:
{num_bytes}",
+ );
+ Ok(MAX_PRECISION[num_bytes as usize - 1])
+ }
+
+ /// Returns minimum bytes required for decimal with [`precision`].
+ #[inline(always)]
+ pub fn decimal_required_bytes(precision: u32) -> Result<u32> {
+ ensure_data_valid!(precision > 0 && precision <=
MAX_DECIMAL_PRECISION, "Decimals with precision larger than
{MAX_DECIMAL_PRECISION} are not supported: {precision}",);
+ Ok(REQUIRED_LENGTH[precision as usize - 1])
+ }
+
+ /// Creates decimal type.
+ #[inline(always)]
+ pub fn decimal(precision: u32, scale: u32) -> Result<Self> {
+ ensure_data_valid!(precision > 0 && precision <=
MAX_DECIMAL_PRECISION, "Decimals with precision larger than
{MAX_DECIMAL_PRECISION} are not supported: {precision}",);
+ Ok(Type::Primitive(PrimitiveType::Decimal { precision, scale }))
+ }
+}
+
+impl From<PrimitiveType> for Type {
+ fn from(value: PrimitiveType) -> Self {
+ Self::Primitive(value)
+ }
+}
+
+impl From<StructType> for Type {
+ fn from(value: StructType) -> Self {
+ Type::Struct(value)
+ }
+}
+
+impl From<ListType> for Type {
+ fn from(value: ListType) -> Self {
+ Type::List(value)
+ }
+}
+
+impl From<MapType> for Type {
+ fn from(value: MapType) -> Self {
+ Type::Map(value)
+ }
}
/// Primitive data types
@@ -112,7 +201,7 @@ pub enum PrimitiveType {
}
impl Serialize for Type {
- fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
where
S: Serializer,
{
@@ -122,7 +211,7 @@ impl Serialize for Type {
}
impl<'de> Deserialize<'de> for Type {
- fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
@@ -132,7 +221,7 @@ impl<'de> Deserialize<'de> for Type {
}
impl<'de> Deserialize<'de> for PrimitiveType {
- fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
@@ -148,7 +237,7 @@ impl<'de> Deserialize<'de> for PrimitiveType {
}
impl Serialize for PrimitiveType {
- fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
where
S: Serializer,
{
@@ -162,7 +251,7 @@ impl Serialize for PrimitiveType {
}
}
-fn deserialize_decimal<'de, D>(deserializer: D) -> Result<PrimitiveType,
D::Error>
+fn deserialize_decimal<'de, D>(deserializer: D) ->
std::result::Result<PrimitiveType, D::Error>
where
D: Deserializer<'de>,
{
@@ -179,14 +268,18 @@ where
})
}
-fn serialize_decimal<S>(precision: &u32, scale: &u32, serializer: S) ->
Result<S::Ok, S::Error>
+fn serialize_decimal<S>(
+ precision: &u32,
+ scale: &u32,
+ serializer: S,
+) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&format!("decimal({precision},{scale})"))
}
-fn deserialize_fixed<'de, D>(deserializer: D) -> Result<PrimitiveType,
D::Error>
+fn deserialize_fixed<'de, D>(deserializer: D) ->
std::result::Result<PrimitiveType, D::Error>
where
D: Deserializer<'de>,
{
@@ -201,7 +294,7 @@ where
.map_err(D::Error::custom)
}
-fn serialize_fixed<S>(value: &u64, serializer: S) -> Result<S::Ok, S::Error>
+fn serialize_fixed<S>(value: &u64, serializer: S) ->
std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
@@ -243,7 +336,7 @@ pub struct StructType {
}
impl<'de> Deserialize<'de> for StructType {
- fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
@@ -263,7 +356,7 @@ impl<'de> Deserialize<'de> for StructType {
formatter.write_str("struct")
}
- fn visit_map<V>(self, mut map: V) -> Result<StructType, V::Error>
+ fn visit_map<V>(self, mut map: V) ->
std::result::Result<StructType, V::Error>
where
V: MapAccess<'de>,
{
@@ -633,6 +726,7 @@ pub struct MapType {
#[cfg(test)]
mod tests {
+ use pretty_assertions::assert_eq;
use uuid::Uuid;
use crate::spec::values::PrimitiveLiteral;
@@ -913,4 +1007,22 @@ mod tests {
}),
);
}
+
+ #[test]
+ fn test_decimal_precision() {
+ let expected_max_precision = [
+ 2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40,
43, 45, 47, 50, 52, 55,
+ 57,
+ ];
+ for (i, max_precision) in expected_max_precision.iter().enumerate() {
+ assert_eq!(
+ *max_precision,
+ Type::decimal_max_precision(i as u32 + 1).unwrap(),
+ "Failed calculate max precision for {i}"
+ );
+ }
+
+ assert_eq!(5, Type::decimal_required_bytes(10).unwrap());
+ assert_eq!(16, Type::decimal_required_bytes(38).unwrap());
+ }
}
diff --git a/crates/iceberg/testdata/avro_schema_manifest_entry.json
b/crates/iceberg/testdata/avro_schema_manifest_entry.json
new file mode 100644
index 0000000..876c5fa
--- /dev/null
+++ b/crates/iceberg/testdata/avro_schema_manifest_entry.json
@@ -0,0 +1,286 @@
+{
+ "type": "record",
+ "name": "manifest_entry",
+ "fields": [
+ {
+ "name": "status",
+ "type": "int",
+ "field-id": 0
+ },
+ {
+ "name": "snapshot_id",
+ "type": [
+ "null",
+ "long"
+ ],
+ "field-id": 1
+ },
+ {
+ "name": "data_file",
+ "type": {
+ "type": "record",
+ "name": "r2",
+ "fields": [
+ {
+ "name": "file_path",
+ "type": "string",
+ "doc": "Location URI with FS scheme",
+ "field-id": 100
+ },
+ {
+ "name": "file_format",
+ "type": "string",
+ "doc": "File format name: avro, orc, or parquet",
+ "field-id": 101
+ },
+ {
+ "name": "partition",
+ "type": {
+ "type": "record",
+ "name": "r102",
+ "fields": [
+ {
+ "field-id": 1000,
+ "name": "VendorID",
+ "type": [
+ "null",
+ "int"
+ ]
+ },
+ {
+ "field-id": 1001,
+ "name": "tpep_pickup_datetime",
+ "type": [
+ "null",
+ {
+ "type": "int",
+ "logicalType": "date"
+ }
+ ]
+ }
+ ]
+ },
+ "field-id": 102
+ },
+ {
+ "name": "record_count",
+ "type": "long",
+ "doc": "Number of records in the file",
+ "field-id": 103
+ },
+ {
+ "name": "file_size_in_bytes",
+ "type": "long",
+ "doc": "Total file size in bytes",
+ "field-id": 104
+ },
+ {
+ "name": "block_size_in_bytes",
+ "type": "long",
+ "field-id": 105
+ },
+ {
+ "name": "column_sizes",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "k117_v118",
+ "fields": [
+ {
+ "name": "key",
+ "type": "int",
+ "field-id": 117
+ },
+ {
+ "name": "value",
+ "type": "long",
+ "field-id": 118
+ }
+ ]
+ },
+ "logicalType": "map"
+ }
+ ],
+ "doc": "Map of column id to total size on disk",
+ "field-id": 108
+ },
+ {
+ "name": "value_counts",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "k119_v120",
+ "fields": [
+ {
+ "name": "key",
+ "type": "int",
+ "field-id": 119
+ },
+ {
+ "name": "value",
+ "type": "long",
+ "field-id": 120
+ }
+ ]
+ },
+ "logicalType": "map"
+ }
+ ],
+ "doc": "Map of column id to total count, including null and NaN",
+ "field-id": 109
+ },
+ {
+ "name": "null_value_counts",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "k121_v122",
+ "fields": [
+ {
+ "name": "key",
+ "type": "int",
+ "field-id": 121
+ },
+ {
+ "name": "value",
+ "type": "long",
+ "field-id": 122
+ }
+ ]
+ },
+ "logicalType": "map"
+ }
+ ],
+ "doc": "Map of column id to null value count",
+ "field-id": 110
+ },
+ {
+ "name": "nan_value_counts",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "k138_v139",
+ "fields": [
+ {
+ "name": "key",
+ "type": "int",
+ "field-id": 138
+ },
+ {
+ "name": "value",
+ "type": "long",
+ "field-id": 139
+ }
+ ]
+ },
+ "logicalType": "map"
+ }
+ ],
+ "doc": "Map of column id to number of NaN values in the column",
+ "field-id": 137
+ },
+ {
+ "name": "lower_bounds",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "k126_v127",
+ "fields": [
+ {
+ "name": "key",
+ "type": "int",
+ "field-id": 126
+ },
+ {
+ "name": "value",
+ "type": "bytes",
+ "field-id": 127
+ }
+ ]
+ },
+ "logicalType": "map"
+ }
+ ],
+ "doc": "Map of column id to lower bound",
+ "field-id": 125
+ },
+ {
+ "name": "upper_bounds",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "k129_v130",
+ "fields": [
+ {
+ "name": "key",
+ "type": "int",
+ "field-id": 129
+ },
+ {
+ "name": "value",
+ "type": "bytes",
+ "field-id": 130
+ }
+ ]
+ },
+ "logicalType": "map"
+ }
+ ],
+ "doc": "Map of column id to upper bound",
+ "field-id": 128
+ },
+ {
+ "name": "key_metadata",
+ "type": [
+ "null",
+ "bytes"
+ ],
+ "doc": "Encryption key metadata blob",
+ "field-id": 131
+ },
+ {
+ "name": "split_offsets",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": "long",
+ "element-id": 133
+ }
+ ],
+ "doc": "Splittable offsets",
+ "field-id": 132
+ },
+ {
+ "name": "sort_order_id",
+ "type": [
+ "null",
+ "int"
+ ],
+ "doc": "Sort order ID",
+ "field-id": 140
+ }
+ ]
+ },
+ "field-id": 2
+ }
+ ]
+}
\ No newline at end of file
diff --git a/crates/iceberg/testdata/avro_schema_manifest_file_v1.json
b/crates/iceberg/testdata/avro_schema_manifest_file_v1.json
new file mode 100644
index 0000000..b185094
--- /dev/null
+++ b/crates/iceberg/testdata/avro_schema_manifest_file_v1.json
@@ -0,0 +1,139 @@
+{
+ "type": "record",
+ "name": "manifest_file",
+ "fields": [
+ {
+ "name": "manifest_path",
+ "type": "string",
+ "doc": "Location URI with FS scheme",
+ "field-id": 500
+ },
+ {
+ "name": "manifest_length",
+ "type": "long",
+ "doc": "Total file size in bytes",
+ "field-id": 501
+ },
+ {
+ "name": "partition_spec_id",
+ "type": "int",
+ "doc": "Spec ID used to write",
+ "field-id": 502
+ },
+ {
+ "name": "added_snapshot_id",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Snapshot ID that added the manifest",
+ "default": null,
+ "field-id": 503
+ },
+ {
+ "name": "added_data_files_count",
+ "type": [
+ "null",
+ "int"
+ ],
+ "doc": "Added entry count",
+ "field-id": 504
+ },
+ {
+ "name": "existing_data_files_count",
+ "type": [
+ "null",
+ "int"
+ ],
+ "doc": "Existing entry count",
+ "field-id": 505
+ },
+ {
+ "name": "deleted_data_files_count",
+ "type": [
+ "null",
+ "int"
+ ],
+ "doc": "Deleted entry count",
+ "field-id": 506
+ },
+ {
+ "name": "partitions",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "r508",
+ "fields": [
+ {
+ "name": "contains_null",
+ "type": "boolean",
+ "doc": "True if any file has a null partition value",
+ "field-id": 509
+ },
+ {
+ "name": "contains_nan",
+ "type": [
+ "null",
+ "boolean"
+ ],
+ "doc": "True if any file has a nan partition value",
+ "field-id": 518
+ },
+ {
+ "name": "lower_bound",
+ "type": [
+ "null",
+ "bytes"
+ ],
+ "doc": "Partition lower bound for all files",
+ "field-id": 510
+ },
+ {
+ "name": "upper_bound",
+ "type": [
+ "null",
+ "bytes"
+ ],
+ "doc": "Partition upper bound for all files",
+ "field-id": 511
+ }
+ ]
+ },
+ "element-id": 508
+ }
+ ],
+ "doc": "Summary for each partition",
+ "field-id": 507
+ },
+ {
+ "name": "added_rows_count",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Added rows count",
+ "field-id": 512
+ },
+ {
+ "name": "existing_rows_count",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Existing rows count",
+ "field-id": 513
+ },
+ {
+ "name": "deleted_rows_count",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Deleted rows count",
+ "field-id": 514
+ }
+ ]
+}
diff --git a/crates/iceberg/testdata/avro_schema_manifest_file_v2.json
b/crates/iceberg/testdata/avro_schema_manifest_file_v2.json
new file mode 100644
index 0000000..34b97b9
--- /dev/null
+++ b/crates/iceberg/testdata/avro_schema_manifest_file_v2.json
@@ -0,0 +1,141 @@
+{
+ "type": "record",
+ "name": "manifest_file",
+ "fields": [
+ {
+ "name": "manifest_path",
+ "type": "string",
+ "doc": "Location URI with FS scheme",
+ "field-id": 500
+ },
+ {
+ "name": "manifest_length",
+ "type": "long",
+ "doc": "Total file size in bytes",
+ "field-id": 501
+ },
+ {
+ "name": "partition_spec_id",
+ "type": "int",
+ "doc": "Spec ID used to write",
+ "field-id": 502
+ },
+ {
+ "name": "content",
+ "type": "int",
+ "doc": "Contents of the manifest: 0=data, 1=deletes",
+ "field-id": 517
+ },
+ {
+ "name": "sequence_number",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Sequence number when the manifest was added",
+ "field-id": 515
+ },
+ {
+ "name": "min_sequence_number",
+ "type": [
+ "null",
+ "long"
+ ],
+ "doc": "Lowest sequence number in the manifest",
+ "field-id": 516
+ },
+ {
+ "name": "added_snapshot_id",
+ "type": "long",
+ "doc": "Snapshot ID that added the manifest",
+ "field-id": 503
+ },
+ {
+ "name": "added_files_count",
+ "type": "int",
+ "doc": "Added entry count",
+ "field-id": 504
+ },
+ {
+ "name": "existing_files_count",
+ "type": "int",
+ "doc": "Existing entry count",
+ "field-id": 505
+ },
+ {
+ "name": "deleted_files_count",
+ "type": "int",
+ "doc": "Deleted entry count",
+ "field-id": 506
+ },
+ {
+ "name": "added_rows_count",
+ "type": "long",
+ "doc": "Added rows count",
+ "field-id": 512
+ },
+ {
+ "name": "existing_rows_count",
+ "type": "long",
+ "doc": "Existing rows count",
+ "field-id": 513
+ },
+ {
+ "name": "deleted_rows_count",
+ "type": "long",
+ "doc": "Deleted rows count",
+ "field-id": 514
+ },
+ {
+ "name": "partitions",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "r508",
+ "fields": [
+ {
+ "name": "contains_null",
+ "type": "boolean",
+ "doc": "True if any file has a null partition value",
+ "field-id": 509
+ },
+ {
+ "name": "contains_nan",
+ "type": [
+ "null",
+ "boolean"
+ ],
+ "doc": "True if any file has a nan partition value",
+ "field-id": 518
+ },
+ {
+ "name": "lower_bound",
+ "type": [
+ "null",
+ "bytes"
+ ],
+ "doc": "Partition lower bound for all files",
+ "field-id": 510
+ },
+ {
+ "name": "upper_bound",
+ "type": [
+ "null",
+ "bytes"
+ ],
+ "doc": "Partition upper bound for all files",
+ "field-id": 511
+ }
+ ]
+ },
+ "element-id": 508
+ }
+ ],
+ "doc": "Summary for each partition",
+ "field-id": 507
+ }
+ ]
+}
\ No newline at end of file