This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 52296eb5 feat: Implement TableRequirement checks (#689)
52296eb5 is described below

commit 52296eb5e1be88a241903488c133214cb5a9a363
Author: Christian <[email protected]>
AuthorDate: Mon Nov 11 12:03:34 2024 +0000

    feat: Implement TableRequirement checks (#689)
    
    * Impelment TableRequirement check
    
    * Address comments
---
 crates/iceberg/src/catalog/mod.rs | 309 +++++++++++++++++++++++++++++++++++++-
 crates/iceberg/src/transaction.rs |   2 +-
 2 files changed, 303 insertions(+), 8 deletions(-)

diff --git a/crates/iceberg/src/catalog/mod.rs 
b/crates/iceberg/src/catalog/mod.rs
index 854c1269..536726fd 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -29,8 +29,8 @@ use typed_builder::TypedBuilder;
 use uuid::Uuid;
 
 use crate::spec::{
-    FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder, 
TableMetadataBuilder,
-    UnboundPartitionSpec, ViewRepresentations,
+    FormatVersion, Schema, SchemaId, Snapshot, SnapshotReference, SortOrder, 
TableMetadata,
+    TableMetadataBuilder, UnboundPartitionSpec, ViewRepresentations,
 };
 use crate::table::Table;
 use crate::{Error, ErrorKind, Result};
@@ -312,14 +312,14 @@ pub enum TableRequirement {
     LastAssignedFieldIdMatch {
         /// The last assigned field id of the table to assert.
         #[serde(rename = "last-assigned-field-id")]
-        last_assigned_field_id: i64,
+        last_assigned_field_id: i32,
     },
     /// The table's current schema id must match the requirement.
     #[serde(rename = "assert-current-schema-id")]
     CurrentSchemaIdMatch {
         /// Current schema id of the table to assert.
         #[serde(rename = "current-schema-id")]
-        current_schema_id: i64,
+        current_schema_id: SchemaId,
     },
     /// The table's last assigned partition id must match the
     /// requirement.
@@ -327,14 +327,14 @@ pub enum TableRequirement {
     LastAssignedPartitionIdMatch {
         /// Last assigned partition id of the table to assert.
         #[serde(rename = "last-assigned-partition-id")]
-        last_assigned_partition_id: i64,
+        last_assigned_partition_id: i32,
     },
     /// The table's default spec id must match the requirement.
     #[serde(rename = "assert-default-spec-id")]
     DefaultSpecIdMatch {
         /// Default spec id of the table to assert.
         #[serde(rename = "default-spec-id")]
-        default_spec_id: i64,
+        default_spec_id: i32,
     },
     /// The table's default sort order id must match the requirement.
     #[serde(rename = "assert-default-sort-order-id")]
@@ -453,6 +453,140 @@ impl TableUpdate {
     }
 }
 
+impl TableRequirement {
+    /// Check that the requirement is met by the table metadata.
+    /// If the requirement is not met, an appropriate error is returned.
+    ///
+    /// Provide metadata as `None` if the table does not exist.
+    pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
+        if let Some(metadata) = metadata {
+            match self {
+                TableRequirement::NotExist => {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Requirement failed: Table with id {} already 
exists",
+                            metadata.uuid()
+                        ),
+                    ));
+                }
+                TableRequirement::UuidMatch { uuid } => {
+                    if &metadata.uuid() != uuid {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            "Requirement failed: Table UUID does not match",
+                        )
+                        .with_context("expected", *uuid)
+                        .with_context("found", metadata.uuid()));
+                    }
+                }
+                TableRequirement::CurrentSchemaIdMatch { current_schema_id } 
=> {
+                    // ToDo: Harmonize the types of current_schema_id
+                    if metadata.current_schema_id != *current_schema_id {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            "Requirement failed: Current schema id does not 
match",
+                        )
+                        .with_context("expected", 
current_schema_id.to_string())
+                        .with_context("found", 
metadata.current_schema_id.to_string()));
+                    }
+                }
+                TableRequirement::DefaultSortOrderIdMatch {
+                    default_sort_order_id,
+                } => {
+                    if metadata.default_sort_order().order_id != 
*default_sort_order_id {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            "Requirement failed: Default sort order id does 
not match",
+                        )
+                        .with_context("expected", 
default_sort_order_id.to_string())
+                        .with_context(
+                            "found",
+                            metadata.default_sort_order().order_id.to_string(),
+                        ));
+                    }
+                }
+                TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => 
{
+                    let snapshot_ref = metadata.snapshot_for_ref(r#ref);
+                    if let Some(snapshot_id) = snapshot_id {
+                        let snapshot_ref = snapshot_ref.ok_or(Error::new(
+                            ErrorKind::DataInvalid,
+                            format!("Requirement failed: Branch or tag `{}` 
not found", r#ref),
+                        ))?;
+                        if snapshot_ref.snapshot_id() != *snapshot_id {
+                            return Err(Error::new(
+                                ErrorKind::DataInvalid,
+                                format!(
+                                    "Requirement failed: Branch or tag `{}`'s 
snapshot has changed",
+                                    r#ref
+                                ),
+                            )
+                            .with_context("expected", snapshot_id.to_string())
+                            .with_context("found", 
snapshot_ref.snapshot_id().to_string()));
+                        }
+                    } else if snapshot_ref.is_some() {
+                        // a null snapshot ID means the ref should not exist 
already
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            format!(
+                                "Requirement failed: Branch or tag `{}` 
already exists",
+                                r#ref
+                            ),
+                        ));
+                    }
+                }
+                TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
+                    // ToDo: Harmonize the types of default_spec_id
+                    if metadata.default_partition_spec_id() != 
*default_spec_id {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            "Requirement failed: Default partition spec id 
does not match",
+                        )
+                        .with_context("expected", default_spec_id.to_string())
+                        .with_context("found", 
metadata.default_partition_spec_id().to_string()));
+                    }
+                }
+                TableRequirement::LastAssignedPartitionIdMatch {
+                    last_assigned_partition_id,
+                } => {
+                    if metadata.last_partition_id != 
*last_assigned_partition_id {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            "Requirement failed: Last assigned partition id 
does not match",
+                        )
+                        .with_context("expected", 
last_assigned_partition_id.to_string())
+                        .with_context("found", 
metadata.last_partition_id.to_string()));
+                    }
+                }
+                TableRequirement::LastAssignedFieldIdMatch {
+                    last_assigned_field_id,
+                } => {
+                    if &metadata.last_column_id != last_assigned_field_id {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            "Requirement failed: Last assigned field id does 
not match",
+                        )
+                        .with_context("expected", 
last_assigned_field_id.to_string())
+                        .with_context("found", 
metadata.last_column_id.to_string()));
+                    }
+                }
+            };
+        } else {
+            match self {
+                TableRequirement::NotExist => {}
+                _ => {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        "Requirement failed: Table does not exist",
+                    ));
+                }
+            }
+        }
+
+        Ok(())
+    }
+}
+
 pub(super) mod _serde {
     use serde::{Deserialize as _, Deserializer};
 
@@ -549,7 +683,7 @@ mod tests {
     use crate::spec::{
         FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, 
Schema, Snapshot,
         SnapshotReference, SnapshotRetention, SortDirection, SortField, 
SortOrder, Summary,
-        TableMetadataBuilder, Transform, Type, UnboundPartitionSpec,
+        TableMetadata, TableMetadataBuilder, Transform, Type, 
UnboundPartitionSpec,
     };
     use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, 
TableUpdate};
 
@@ -593,6 +727,167 @@ mod tests {
         );
     }
 
+    fn metadata() -> TableMetadata {
+        let tbl_creation = TableCreation::builder()
+            .name("table".to_string())
+            .location("/path/to/table".to_string())
+            .schema(Schema::builder().build().unwrap())
+            .build();
+
+        TableMetadataBuilder::from_table_creation(tbl_creation)
+            .unwrap()
+            .assign_uuid(uuid::Uuid::nil())
+            .unwrap()
+            .build()
+            .unwrap()
+    }
+
+    #[test]
+    fn test_check_requirement_not_exist() {
+        let metadata = metadata();
+        let requirement = TableRequirement::NotExist;
+
+        assert!(requirement.check(Some(&metadata)).is_err());
+        assert!(requirement.check(None).is_ok());
+    }
+
+    #[test]
+    fn test_check_table_uuid() {
+        let metadata = metadata();
+
+        let requirement = TableRequirement::UuidMatch {
+            uuid: uuid::Uuid::now_v7(),
+        };
+        assert!(requirement.check(Some(&metadata)).is_err());
+
+        let requirement = TableRequirement::UuidMatch {
+            uuid: uuid::Uuid::nil(),
+        };
+        assert!(requirement.check(Some(&metadata)).is_ok());
+    }
+
+    #[test]
+    fn test_check_ref_snapshot_id() {
+        let metadata = metadata();
+
+        // Ref does not exist but should
+        let requirement = TableRequirement::RefSnapshotIdMatch {
+            r#ref: "my_branch".to_string(),
+            snapshot_id: Some(1),
+        };
+        assert!(requirement.check(Some(&metadata)).is_err());
+
+        // Ref does not exist and should not
+        let requirement = TableRequirement::RefSnapshotIdMatch {
+            r#ref: "my_branch".to_string(),
+            snapshot_id: None,
+        };
+        assert!(requirement.check(Some(&metadata)).is_ok());
+
+        // Add snapshot
+        let record = r#"
+        {
+            "snapshot-id": 3051729675574597004,
+            "sequence-number": 10,
+            "timestamp-ms": 1515100955770,
+            "summary": {
+                "operation": "append"
+            },
+            "manifest-list": "s3://b/wh/.../s1.avro",
+            "schema-id": 0
+        }
+        "#;
+
+        let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
+        let mut metadata = metadata;
+        metadata.append_snapshot(snapshot);
+
+        // Ref exists and should matches
+        let requirement = TableRequirement::RefSnapshotIdMatch {
+            r#ref: "main".to_string(),
+            snapshot_id: Some(3051729675574597004),
+        };
+        assert!(requirement.check(Some(&metadata)).is_ok());
+
+        // Ref exists but does not match
+        let requirement = TableRequirement::RefSnapshotIdMatch {
+            r#ref: "main".to_string(),
+            snapshot_id: Some(1),
+        };
+        assert!(requirement.check(Some(&metadata)).is_err());
+    }
+
+    #[test]
+    fn test_check_last_assigned_field_id() {
+        let metadata = metadata();
+
+        let requirement = TableRequirement::LastAssignedFieldIdMatch {
+            last_assigned_field_id: 1,
+        };
+        assert!(requirement.check(Some(&metadata)).is_err());
+
+        let requirement = TableRequirement::LastAssignedFieldIdMatch {
+            last_assigned_field_id: 0,
+        };
+        assert!(requirement.check(Some(&metadata)).is_ok());
+    }
+
+    #[test]
+    fn test_check_current_schema_id() {
+        let metadata = metadata();
+
+        let requirement = TableRequirement::CurrentSchemaIdMatch {
+            current_schema_id: 1,
+        };
+        assert!(requirement.check(Some(&metadata)).is_err());
+
+        let requirement = TableRequirement::CurrentSchemaIdMatch {
+            current_schema_id: 0,
+        };
+        assert!(requirement.check(Some(&metadata)).is_ok());
+    }
+
+    #[test]
+    fn test_check_last_assigned_partition_id() {
+        let metadata = metadata();
+
+        let requirement = TableRequirement::LastAssignedPartitionIdMatch {
+            last_assigned_partition_id: 1,
+        };
+        assert!(requirement.check(Some(&metadata)).is_err());
+
+        let requirement = TableRequirement::LastAssignedPartitionIdMatch {
+            last_assigned_partition_id: 0,
+        };
+        assert!(requirement.check(Some(&metadata)).is_ok());
+    }
+
+    #[test]
+    fn test_check_default_spec_id() {
+        let metadata = metadata();
+
+        let requirement = TableRequirement::DefaultSpecIdMatch { 
default_spec_id: 1 };
+        assert!(requirement.check(Some(&metadata)).is_err());
+
+        let requirement = TableRequirement::DefaultSpecIdMatch { 
default_spec_id: 0 };
+        assert!(requirement.check(Some(&metadata)).is_ok());
+    }
+
+    #[test]
+    fn test_check_default_sort_order_id() {
+        let metadata = metadata();
+
+        let requirement = TableRequirement::DefaultSortOrderIdMatch {
+            default_sort_order_id: 1,
+        };
+        assert!(requirement.check(Some(&metadata)).is_err());
+
+        let requirement = TableRequirement::DefaultSortOrderIdMatch {
+            default_sort_order_id: 0,
+        };
+        assert!(requirement.check(Some(&metadata)).is_ok());
+    }
+
     #[test]
     fn test_table_uuid() {
         test_serde_json(
diff --git a/crates/iceberg/src/transaction.rs 
b/crates/iceberg/src/transaction.rs
index db7c3f28..f29cf512 100644
--- a/crates/iceberg/src/transaction.rs
+++ b/crates/iceberg/src/transaction.rs
@@ -154,7 +154,7 @@ impl<'a> ReplaceSortOrderAction<'a> {
 
         let requirements = vec![
             TableRequirement::CurrentSchemaIdMatch {
-                current_schema_id: 
self.tx.table.metadata().current_schema().schema_id() as i64,
+                current_schema_id: 
self.tx.table.metadata().current_schema().schema_id(),
             },
             TableRequirement::DefaultSortOrderIdMatch {
                 default_sort_order_id: 
self.tx.table.metadata().default_sort_order().order_id,

Reply via email to