This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ffb691d fix (manifest-list): added serde aliases to support both
forms conventions (#365)
ffb691d is described below
commit ffb691da60ae37b3eb4b1838e3f006838d2d3807
Author: Alon Agmon <[email protected]>
AuthorDate: Thu May 9 15:45:06 2024 +0300
fix (manifest-list): added serde aliases to support both forms conventions
(#365)
* added serde aliases to support both forms conventions
* reading manifests without avro schema
* adding avro files of both versions and add a test to deser both
* fixed typo
---
crates/iceberg/src/spec/manifest_list.rs | 62 +++++++++++++++++++--
.../manifests_lists/manifest-list-v2-1.avro | Bin 0 -> 4247 bytes
.../manifests_lists/manifest-list-v2-2.avro | Bin 0 -> 4218 bytes
3 files changed, 57 insertions(+), 5 deletions(-)
diff --git a/crates/iceberg/src/spec/manifest_list.rs
b/crates/iceberg/src/spec/manifest_list.rs
index da63815..c390bee 100644
--- a/crates/iceberg/src/spec/manifest_list.rs
+++ b/crates/iceberg/src/spec/manifest_list.rs
@@ -68,7 +68,7 @@ impl ManifestList {
from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_type_provider)
}
FormatVersion::V2 => {
- let reader =
Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V2, bs)?;
+ let reader = Reader::new(bs)?;
let values =
Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_type_provider)
}
@@ -802,6 +802,9 @@ pub(super) mod _serde {
pub key_metadata: Option<ByteBuf>,
}
+ // Aliases were added to fields that were renamed in Iceberg 1.5.0
(https://github.com/apache/iceberg/pull/5338), in order to support both
conventions/versions.
+ // In the current implementation deserialization is done using field
names, and therefore these fields may appear as either.
+ // see issue that raised this here:
https://github.com/apache/iceberg-rust/issues/338
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(super) struct ManifestFileV2 {
pub manifest_path: String,
@@ -811,8 +814,11 @@ pub(super) mod _serde {
pub sequence_number: i64,
pub min_sequence_number: i64,
pub added_snapshot_id: i64,
+ #[serde(alias = "added_data_files_count", alias = "added_files_count")]
pub added_data_files_count: i32,
+ #[serde(alias = "existing_data_files_count", alias =
"existing_files_count")]
pub existing_data_files_count: i32,
+ #[serde(alias = "deleted_data_files_count", alias =
"deleted_files_count")]
pub deleted_data_files_count: i32,
pub added_rows_count: i64,
pub existing_rows_count: i64,
@@ -1089,16 +1095,16 @@ pub(super) mod _serde {
#[cfg(test)]
mod test {
+ use apache_avro::{Reader, Schema};
use std::{collections::HashMap, fs, sync::Arc};
-
use tempfile::TempDir;
use crate::{
io::FileIOBuilder,
spec::{
- manifest_list::{_serde::ManifestListV1,
UNASSIGNED_SEQUENCE_NUMBER},
- FieldSummary, Literal, ManifestContentType, ManifestFile,
ManifestList,
- ManifestListWriter, NestedField, PrimitiveType, StructType, Type,
+ manifest_list::_serde::ManifestListV1, FieldSummary, Literal,
ManifestContentType,
+ ManifestFile, ManifestList, ManifestListWriter, NestedField,
PrimitiveType, StructType,
+ Type, UNASSIGNED_SEQUENCE_NUMBER,
},
};
@@ -1462,4 +1468,50 @@ mod test {
temp_dir.close().unwrap();
}
+
+ #[tokio::test]
+ async fn test_manifest_list_v2_deserializer_aliases() {
+ // reading avro manifest file generated by iceberg 1.4.0
+ let avro_1_path = "testdata/manifests_lists/manifest-list-v2-1.avro";
+ let bs_1 = fs::read(avro_1_path).unwrap();
+ let avro_1_fields = read_avro_schema_fields_as_str(bs_1.clone()).await;
+ assert_eq!(
+ avro_1_fields,
+ "manifest_path, manifest_length, partition_spec_id, content,
sequence_number, min_sequence_number, added_snapshot_id,
added_data_files_count, existing_data_files_count, deleted_data_files_count,
added_rows_count, existing_rows_count, deleted_rows_count, partitions"
+ );
+ // reading avro manifest file generated by iceberg 1.5.0
+ let avro_2_path = "testdata/manifests_lists/manifest-list-v2-2.avro";
+ let bs_2 = fs::read(avro_2_path).unwrap();
+ let avro_2_fields = read_avro_schema_fields_as_str(bs_2.clone()).await;
+ assert_eq!(
+ avro_2_fields,
+ "manifest_path, manifest_length, partition_spec_id, content,
sequence_number, min_sequence_number, added_snapshot_id, added_files_count,
existing_files_count, deleted_files_count, added_rows_count,
existing_rows_count, deleted_rows_count, partitions"
+ );
+ // deserializing both files to ManifestList struct
+ let _manifest_list_1 =
+ ManifestList::parse_with_version(&bs_1,
crate::spec::FormatVersion::V2, move |_id| {
+ Ok(Some(StructType::new(vec![])))
+ })
+ .unwrap();
+ let _manifest_list_2 =
+ ManifestList::parse_with_version(&bs_2,
crate::spec::FormatVersion::V2, move |_id| {
+ Ok(Some(StructType::new(vec![])))
+ })
+ .unwrap();
+ }
+
+ async fn read_avro_schema_fields_as_str(bs: Vec<u8>) -> String {
+ let reader = Reader::new(&bs[..]).unwrap();
+ let schema = reader.writer_schema();
+ let fields: String = match schema {
+ Schema::Record(record) => record
+ .fields
+ .iter()
+ .map(|field| field.name.clone())
+ .collect::<Vec<String>>()
+ .join(", "),
+ _ => "".to_string(),
+ };
+ fields
+ }
}
diff --git a/crates/iceberg/testdata/manifests_lists/manifest-list-v2-1.avro
b/crates/iceberg/testdata/manifests_lists/manifest-list-v2-1.avro
new file mode 100644
index 0000000..5c5cdb1
Binary files /dev/null and
b/crates/iceberg/testdata/manifests_lists/manifest-list-v2-1.avro differ
diff --git a/crates/iceberg/testdata/manifests_lists/manifest-list-v2-2.avro
b/crates/iceberg/testdata/manifests_lists/manifest-list-v2-2.avro
new file mode 100644
index 0000000..00784ff
Binary files /dev/null and
b/crates/iceberg/testdata/manifests_lists/manifest-list-v2-2.avro differ