This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new e4f55d1 fix: Manifest parsing should consider schema evolution. (#171)
e4f55d1 is described below
commit e4f55d1428ed90ce28cc6c3ea0e9cbc2ed544cee
Author: Renjie Liu <[email protected]>
AuthorDate: Thu Jan 25 21:24:21 2024 +0800
fix: Manifest parsing should consider schema evolution. (#171)
* fix: Manifest parsing should consider schema evolution.
* Fix ut
---
crates/iceberg/src/spec/manifest.rs | 171 ++++++++++++++++++++++++++++++++----
1 file changed, 156 insertions(+), 15 deletions(-)
diff --git a/crates/iceberg/src/spec/manifest.rs
b/crates/iceberg/src/spec/manifest.rs
index bdd0d0a..76e6e76 100644
--- a/crates/iceberg/src/spec/manifest.rs
+++ b/crates/iceberg/src/spec/manifest.rs
@@ -1328,16 +1328,11 @@ mod _serde {
) -> Result<HashMap<i32, Literal>, Error> {
let mut m = HashMap::with_capacity(v.len());
for entry in v {
- let data_type = &schema
- .field_by_id(entry.key)
- .ok_or_else(|| {
- Error::new(
- ErrorKind::DataInvalid,
- format!("Can't find field id {} for
upper/lower_bounds", entry.key),
- )
- })?
- .field_type;
- m.insert(entry.key, Literal::try_from_bytes(&entry.value,
data_type)?);
+ // We ignore the entry if the field is not found in the schema,
due to schema evolution.
+ if let Some(field) = schema.field_by_id(entry.key) {
+ let data_type = &field.field_type;
+ m.insert(entry.key, Literal::try_from_bytes(&entry.value,
data_type)?);
+ }
}
Ok(m)
}
@@ -1822,10 +1817,160 @@ mod tests {
assert_eq!(entry.partitions[0].upper_bound,
Some(Literal::string("x")));
}
+ #[tokio::test]
+ async fn test_parse_manifest_with_schema_evolution() {
+ let manifest = Manifest {
+ metadata: ManifestMetadata {
+ schema_id: 0,
+ schema: Schema::builder()
+ .with_fields(vec![
+ Arc::new(NestedField::optional(
+ 1,
+ "id",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ Arc::new(NestedField::optional(
+ 2,
+ "v_int",
+ Type::Primitive(PrimitiveType::Int),
+ )),
+ ])
+ .build()
+ .unwrap(),
+ partition_spec: PartitionSpec {
+ spec_id: 0,
+ fields: vec![],
+ },
+ content: ManifestContentType::Data,
+ format_version: FormatVersion::V2,
+ },
+ entries: vec![Arc::new(ManifestEntry {
+ status: ManifestStatus::Added,
+ snapshot_id: None,
+ sequence_number: None,
+ file_sequence_number: None,
+ data_file: DataFile {
+ content: DataContentType::Data,
+ file_format: DataFileFormat::Parquet,
+ file_path:
"s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
+ partition: Struct::empty(),
+ record_count: 1,
+ file_size_in_bytes: 5442,
+ column_sizes: HashMap::from([
+ (1, 61),
+ (2, 73),
+ (3, 61),
+ ]),
+ value_counts: HashMap::default(),
+ null_value_counts: HashMap::default(),
+ nan_value_counts: HashMap::new(),
+ lower_bounds: HashMap::from([
+ (1, Literal::long(1)),
+ (2, Literal::int(2)),
+ (3, Literal::string("x"))
+ ]),
+ upper_bounds: HashMap::from([
+ (1, Literal::long(1)),
+ (2, Literal::int(2)),
+ (3, Literal::string("x"))
+ ]),
+ key_metadata: vec![],
+ split_offsets: vec![4],
+ equality_ids: vec![],
+ sort_order_id: None,
+ },
+ })],
+ };
+
+ let writer = |output_file: OutputFile|
ManifestWriter::new(output_file, 1, vec![]);
+
+ let (avro_bytes, _) = write_manifest(&manifest, writer).await;
+
+ // The parse should succeed.
+ let actual_manifest =
Manifest::parse_avro(avro_bytes.as_slice()).unwrap();
+
+ // Compared with original manifest, the lower_bounds and upper_bounds
no longer has data for field 3, and
+ // other parts should be same.
+ let expected_manifest = Manifest {
+ metadata: ManifestMetadata {
+ schema_id: 0,
+ schema: Schema::builder()
+ .with_fields(vec![
+ Arc::new(NestedField::optional(
+ 1,
+ "id",
+ Type::Primitive(PrimitiveType::Long),
+ )),
+ Arc::new(NestedField::optional(
+ 2,
+ "v_int",
+ Type::Primitive(PrimitiveType::Int),
+ )),
+ ])
+ .build()
+ .unwrap(),
+ partition_spec: PartitionSpec {
+ spec_id: 0,
+ fields: vec![],
+ },
+ content: ManifestContentType::Data,
+ format_version: FormatVersion::V2,
+ },
+ entries: vec![Arc::new(ManifestEntry {
+ status: ManifestStatus::Added,
+ snapshot_id: None,
+ sequence_number: None,
+ file_sequence_number: None,
+ data_file: DataFile {
+ content: DataContentType::Data,
+ file_format: DataFileFormat::Parquet,
+ file_path:
"s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
+ partition: Struct::empty(),
+ record_count: 1,
+ file_size_in_bytes: 5442,
+ column_sizes: HashMap::from([
+ (1, 61),
+ (2, 73),
+ (3, 61),
+ ]),
+ value_counts: HashMap::default(),
+ null_value_counts: HashMap::default(),
+ nan_value_counts: HashMap::new(),
+ lower_bounds: HashMap::from([
+ (1, Literal::long(1)),
+ (2, Literal::int(2)),
+ ]),
+ upper_bounds: HashMap::from([
+ (1, Literal::long(1)),
+ (2, Literal::int(2)),
+ ]),
+ key_metadata: vec![],
+ split_offsets: vec![4],
+ equality_ids: vec![],
+ sort_order_id: None,
+ },
+ })],
+ };
+
+ assert_eq!(actual_manifest, expected_manifest);
+ }
+
async fn test_manifest_read_write(
manifest: Manifest,
writer_builder: impl FnOnce(OutputFile) -> ManifestWriter,
) -> ManifestListEntry {
+ let (bs, res) = write_manifest(&manifest, writer_builder).await;
+ let actual_manifest = Manifest::parse_avro(bs.as_slice()).unwrap();
+
+ assert_eq!(actual_manifest, manifest);
+ res
+ }
+
+ /// Utility method which writes out a manifest and returns the bytes.
+ async fn write_manifest(
+ manifest: &Manifest,
+ writer_builder: impl FnOnce(OutputFile) -> ManifestWriter,
+ ) -> (Vec<u8>, ManifestListEntry) {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join("test_manifest.avro");
let io = FileIOBuilder::new_fs_io().build().unwrap();
@@ -1834,10 +1979,6 @@ mod tests {
let res = writer.write(manifest.clone()).await.unwrap();
// Verify manifest
- let bs = fs::read(path).expect("read_file must succeed");
- let actual_manifest = Manifest::parse_avro(bs.as_slice()).unwrap();
-
- assert_eq!(actual_manifest, manifest);
- res
+ (fs::read(path).expect("read_file must succeed"), res)
}
}