This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new e4f55d1  fix: Manifest parsing should consider schema evolution. (#171)
e4f55d1 is described below

commit e4f55d1428ed90ce28cc6c3ea0e9cbc2ed544cee
Author: Renjie Liu <[email protected]>
AuthorDate: Thu Jan 25 21:24:21 2024 +0800

    fix: Manifest parsing should consider schema evolution. (#171)
    
    * fix: Manifest parsing should consider schema evolution.
    
    * Fix ut
---
 crates/iceberg/src/spec/manifest.rs | 171 ++++++++++++++++++++++++++++++++----
 1 file changed, 156 insertions(+), 15 deletions(-)

diff --git a/crates/iceberg/src/spec/manifest.rs 
b/crates/iceberg/src/spec/manifest.rs
index bdd0d0a..76e6e76 100644
--- a/crates/iceberg/src/spec/manifest.rs
+++ b/crates/iceberg/src/spec/manifest.rs
@@ -1328,16 +1328,11 @@ mod _serde {
     ) -> Result<HashMap<i32, Literal>, Error> {
         let mut m = HashMap::with_capacity(v.len());
         for entry in v {
-            let data_type = &schema
-                .field_by_id(entry.key)
-                .ok_or_else(|| {
-                    Error::new(
-                        ErrorKind::DataInvalid,
-                        format!("Can't find field id {} for 
upper/lower_bounds", entry.key),
-                    )
-                })?
-                .field_type;
-            m.insert(entry.key, Literal::try_from_bytes(&entry.value, 
data_type)?);
+            // We ignore the entry if the field is not found in the schema, 
due to schema evolution.
+            if let Some(field) = schema.field_by_id(entry.key) {
+                let data_type = &field.field_type;
+                m.insert(entry.key, Literal::try_from_bytes(&entry.value, 
data_type)?);
+            }
         }
         Ok(m)
     }
@@ -1822,10 +1817,160 @@ mod tests {
         assert_eq!(entry.partitions[0].upper_bound, 
Some(Literal::string("x")));
     }
 
+    #[tokio::test]
+    async fn test_parse_manifest_with_schema_evolution() {
+        let manifest = Manifest {
+            metadata: ManifestMetadata {
+                schema_id: 0,
+                schema: Schema::builder()
+                    .with_fields(vec![
+                        Arc::new(NestedField::optional(
+                            1,
+                            "id",
+                            Type::Primitive(PrimitiveType::Long),
+                        )),
+                        Arc::new(NestedField::optional(
+                            2,
+                            "v_int",
+                            Type::Primitive(PrimitiveType::Int),
+                        )),
+                    ])
+                    .build()
+                    .unwrap(),
+                partition_spec: PartitionSpec {
+                    spec_id: 0,
+                    fields: vec![],
+                },
+                content: ManifestContentType::Data,
+                format_version: FormatVersion::V2,
+            },
+            entries: vec![Arc::new(ManifestEntry {
+                status: ManifestStatus::Added,
+                snapshot_id: None,
+                sequence_number: None,
+                file_sequence_number: None,
+                data_file: DataFile {
+                    content: DataContentType::Data,
+                    file_format: DataFileFormat::Parquet,
+                    file_path: 
"s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
+                    partition: Struct::empty(),
+                    record_count: 1,
+                    file_size_in_bytes: 5442,
+                    column_sizes: HashMap::from([
+                        (1, 61),
+                        (2, 73),
+                        (3, 61),
+                    ]),
+                    value_counts: HashMap::default(),
+                    null_value_counts: HashMap::default(),
+                    nan_value_counts: HashMap::new(),
+                    lower_bounds: HashMap::from([
+                        (1, Literal::long(1)),
+                        (2, Literal::int(2)),
+                        (3, Literal::string("x"))
+                    ]),
+                    upper_bounds: HashMap::from([
+                        (1, Literal::long(1)),
+                        (2, Literal::int(2)),
+                        (3, Literal::string("x"))
+                    ]),
+                    key_metadata: vec![],
+                    split_offsets: vec![4],
+                    equality_ids: vec![],
+                    sort_order_id: None,
+                },
+            })],
+        };
+
+        let writer = |output_file: OutputFile| 
ManifestWriter::new(output_file, 1, vec![]);
+
+        let (avro_bytes, _) = write_manifest(&manifest, writer).await;
+
+        // The parse should succeed.
+        let actual_manifest = 
Manifest::parse_avro(avro_bytes.as_slice()).unwrap();
+
+        // Compared with original manifest, the lower_bounds and upper_bounds 
no longer has data for field 3, and
+        // other parts should be same.
+        let expected_manifest = Manifest {
+            metadata: ManifestMetadata {
+                schema_id: 0,
+                schema: Schema::builder()
+                    .with_fields(vec![
+                        Arc::new(NestedField::optional(
+                            1,
+                            "id",
+                            Type::Primitive(PrimitiveType::Long),
+                        )),
+                        Arc::new(NestedField::optional(
+                            2,
+                            "v_int",
+                            Type::Primitive(PrimitiveType::Int),
+                        )),
+                    ])
+                    .build()
+                    .unwrap(),
+                partition_spec: PartitionSpec {
+                    spec_id: 0,
+                    fields: vec![],
+                },
+                content: ManifestContentType::Data,
+                format_version: FormatVersion::V2,
+            },
+            entries: vec![Arc::new(ManifestEntry {
+                status: ManifestStatus::Added,
+                snapshot_id: None,
+                sequence_number: None,
+                file_sequence_number: None,
+                data_file: DataFile {
+                    content: DataContentType::Data,
+                    file_format: DataFileFormat::Parquet,
+                    file_path: 
"s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
+                    partition: Struct::empty(),
+                    record_count: 1,
+                    file_size_in_bytes: 5442,
+                    column_sizes: HashMap::from([
+                        (1, 61),
+                        (2, 73),
+                        (3, 61),
+                    ]),
+                    value_counts: HashMap::default(),
+                    null_value_counts: HashMap::default(),
+                    nan_value_counts: HashMap::new(),
+                    lower_bounds: HashMap::from([
+                        (1, Literal::long(1)),
+                        (2, Literal::int(2)),
+                    ]),
+                    upper_bounds: HashMap::from([
+                        (1, Literal::long(1)),
+                        (2, Literal::int(2)),
+                    ]),
+                    key_metadata: vec![],
+                    split_offsets: vec![4],
+                    equality_ids: vec![],
+                    sort_order_id: None,
+                },
+            })],
+        };
+
+        assert_eq!(actual_manifest, expected_manifest);
+    }
+
     async fn test_manifest_read_write(
         manifest: Manifest,
         writer_builder: impl FnOnce(OutputFile) -> ManifestWriter,
     ) -> ManifestListEntry {
+        let (bs, res) = write_manifest(&manifest, writer_builder).await;
+        let actual_manifest = Manifest::parse_avro(bs.as_slice()).unwrap();
+
+        assert_eq!(actual_manifest, manifest);
+        res
+    }
+
+    /// Utility method which writes out a manifest and returns the bytes.
+    async fn write_manifest(
+        manifest: &Manifest,
+        writer_builder: impl FnOnce(OutputFile) -> ManifestWriter,
+    ) -> (Vec<u8>, ManifestListEntry) {
         let temp_dir = TempDir::new().unwrap();
         let path = temp_dir.path().join("test_manifest.avro");
         let io = FileIOBuilder::new_fs_io().build().unwrap();
@@ -1834,10 +1979,6 @@ mod tests {
         let res = writer.write(manifest.clone()).await.unwrap();
 
         // Verify manifest
-        let bs = fs::read(path).expect("read_file must succeed");
-        let actual_manifest = Manifest::parse_avro(bs.as_slice()).unwrap();
-
-        assert_eq!(actual_manifest, manifest);
-        res
+        (fs::read(path).expect("read_file must succeed"), res)
     }
 }

Reply via email to