This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 52296eb5 feat: Implement TableRequirement checks (#689)
52296eb5 is described below
commit 52296eb5e1be88a241903488c133214cb5a9a363
Author: Christian <[email protected]>
AuthorDate: Mon Nov 11 12:03:34 2024 +0000
feat: Implement TableRequirement checks (#689)
* Impelment TableRequirement check
* Address comments
---
crates/iceberg/src/catalog/mod.rs | 309 +++++++++++++++++++++++++++++++++++++-
crates/iceberg/src/transaction.rs | 2 +-
2 files changed, 303 insertions(+), 8 deletions(-)
diff --git a/crates/iceberg/src/catalog/mod.rs
b/crates/iceberg/src/catalog/mod.rs
index 854c1269..536726fd 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -29,8 +29,8 @@ use typed_builder::TypedBuilder;
use uuid::Uuid;
use crate::spec::{
- FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder,
TableMetadataBuilder,
- UnboundPartitionSpec, ViewRepresentations,
+ FormatVersion, Schema, SchemaId, Snapshot, SnapshotReference, SortOrder,
TableMetadata,
+ TableMetadataBuilder, UnboundPartitionSpec, ViewRepresentations,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
@@ -312,14 +312,14 @@ pub enum TableRequirement {
LastAssignedFieldIdMatch {
/// The last assigned field id of the table to assert.
#[serde(rename = "last-assigned-field-id")]
- last_assigned_field_id: i64,
+ last_assigned_field_id: i32,
},
/// The table's current schema id must match the requirement.
#[serde(rename = "assert-current-schema-id")]
CurrentSchemaIdMatch {
/// Current schema id of the table to assert.
#[serde(rename = "current-schema-id")]
- current_schema_id: i64,
+ current_schema_id: SchemaId,
},
/// The table's last assigned partition id must match the
/// requirement.
@@ -327,14 +327,14 @@ pub enum TableRequirement {
LastAssignedPartitionIdMatch {
/// Last assigned partition id of the table to assert.
#[serde(rename = "last-assigned-partition-id")]
- last_assigned_partition_id: i64,
+ last_assigned_partition_id: i32,
},
/// The table's default spec id must match the requirement.
#[serde(rename = "assert-default-spec-id")]
DefaultSpecIdMatch {
/// Default spec id of the table to assert.
#[serde(rename = "default-spec-id")]
- default_spec_id: i64,
+ default_spec_id: i32,
},
/// The table's default sort order id must match the requirement.
#[serde(rename = "assert-default-sort-order-id")]
@@ -453,6 +453,140 @@ impl TableUpdate {
}
}
+impl TableRequirement {
+ /// Check that the requirement is met by the table metadata.
+ /// If the requirement is not met, an appropriate error is returned.
+ ///
+ /// Provide metadata as `None` if the table does not exist.
+ pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
+ if let Some(metadata) = metadata {
+ match self {
+ TableRequirement::NotExist => {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Requirement failed: Table with id {} already
exists",
+ metadata.uuid()
+ ),
+ ));
+ }
+ TableRequirement::UuidMatch { uuid } => {
+ if &metadata.uuid() != uuid {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Requirement failed: Table UUID does not match",
+ )
+ .with_context("expected", *uuid)
+ .with_context("found", metadata.uuid()));
+ }
+ }
+ TableRequirement::CurrentSchemaIdMatch { current_schema_id }
=> {
+ // ToDo: Harmonize the types of current_schema_id
+ if metadata.current_schema_id != *current_schema_id {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Requirement failed: Current schema id does not
match",
+ )
+ .with_context("expected",
current_schema_id.to_string())
+ .with_context("found",
metadata.current_schema_id.to_string()));
+ }
+ }
+ TableRequirement::DefaultSortOrderIdMatch {
+ default_sort_order_id,
+ } => {
+ if metadata.default_sort_order().order_id !=
*default_sort_order_id {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Requirement failed: Default sort order id does
not match",
+ )
+ .with_context("expected",
default_sort_order_id.to_string())
+ .with_context(
+ "found",
+ metadata.default_sort_order().order_id.to_string(),
+ ));
+ }
+ }
+ TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } =>
{
+ let snapshot_ref = metadata.snapshot_for_ref(r#ref);
+ if let Some(snapshot_id) = snapshot_id {
+ let snapshot_ref = snapshot_ref.ok_or(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Requirement failed: Branch or tag `{}`
not found", r#ref),
+ ))?;
+ if snapshot_ref.snapshot_id() != *snapshot_id {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Requirement failed: Branch or tag `{}`'s
snapshot has changed",
+ r#ref
+ ),
+ )
+ .with_context("expected", snapshot_id.to_string())
+ .with_context("found",
snapshot_ref.snapshot_id().to_string()));
+ }
+ } else if snapshot_ref.is_some() {
+ // a null snapshot ID means the ref should not exist
already
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Requirement failed: Branch or tag `{}`
already exists",
+ r#ref
+ ),
+ ));
+ }
+ }
+ TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
+ // ToDo: Harmonize the types of default_spec_id
+ if metadata.default_partition_spec_id() !=
*default_spec_id {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Requirement failed: Default partition spec id
does not match",
+ )
+ .with_context("expected", default_spec_id.to_string())
+ .with_context("found",
metadata.default_partition_spec_id().to_string()));
+ }
+ }
+ TableRequirement::LastAssignedPartitionIdMatch {
+ last_assigned_partition_id,
+ } => {
+ if metadata.last_partition_id !=
*last_assigned_partition_id {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Requirement failed: Last assigned partition id
does not match",
+ )
+ .with_context("expected",
last_assigned_partition_id.to_string())
+ .with_context("found",
metadata.last_partition_id.to_string()));
+ }
+ }
+ TableRequirement::LastAssignedFieldIdMatch {
+ last_assigned_field_id,
+ } => {
+ if &metadata.last_column_id != last_assigned_field_id {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Requirement failed: Last assigned field id does
not match",
+ )
+ .with_context("expected",
last_assigned_field_id.to_string())
+ .with_context("found",
metadata.last_column_id.to_string()));
+ }
+ }
+ };
+ } else {
+ match self {
+ TableRequirement::NotExist => {}
+ _ => {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Requirement failed: Table does not exist",
+ ));
+ }
+ }
+ }
+
+ Ok(())
+ }
+}
+
pub(super) mod _serde {
use serde::{Deserialize as _, Deserializer};
@@ -549,7 +683,7 @@ mod tests {
use crate::spec::{
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType,
Schema, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField,
SortOrder, Summary,
- TableMetadataBuilder, Transform, Type, UnboundPartitionSpec,
+ TableMetadata, TableMetadataBuilder, Transform, Type,
UnboundPartitionSpec,
};
use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement,
TableUpdate};
@@ -593,6 +727,167 @@ mod tests {
);
}
+ fn metadata() -> TableMetadata {
+ let tbl_creation = TableCreation::builder()
+ .name("table".to_string())
+ .location("/path/to/table".to_string())
+ .schema(Schema::builder().build().unwrap())
+ .build();
+
+ TableMetadataBuilder::from_table_creation(tbl_creation)
+ .unwrap()
+ .assign_uuid(uuid::Uuid::nil())
+ .unwrap()
+ .build()
+ .unwrap()
+ }
+
+ #[test]
+ fn test_check_requirement_not_exist() {
+ let metadata = metadata();
+ let requirement = TableRequirement::NotExist;
+
+ assert!(requirement.check(Some(&metadata)).is_err());
+ assert!(requirement.check(None).is_ok());
+ }
+
+ #[test]
+ fn test_check_table_uuid() {
+ let metadata = metadata();
+
+ let requirement = TableRequirement::UuidMatch {
+ uuid: uuid::Uuid::now_v7(),
+ };
+ assert!(requirement.check(Some(&metadata)).is_err());
+
+ let requirement = TableRequirement::UuidMatch {
+ uuid: uuid::Uuid::nil(),
+ };
+ assert!(requirement.check(Some(&metadata)).is_ok());
+ }
+
+ #[test]
+ fn test_check_ref_snapshot_id() {
+ let metadata = metadata();
+
+ // Ref does not exist but should
+ let requirement = TableRequirement::RefSnapshotIdMatch {
+ r#ref: "my_branch".to_string(),
+ snapshot_id: Some(1),
+ };
+ assert!(requirement.check(Some(&metadata)).is_err());
+
+ // Ref does not exist and should not
+ let requirement = TableRequirement::RefSnapshotIdMatch {
+ r#ref: "my_branch".to_string(),
+ snapshot_id: None,
+ };
+ assert!(requirement.check(Some(&metadata)).is_ok());
+
+ // Add snapshot
+ let record = r#"
+ {
+ "snapshot-id": 3051729675574597004,
+ "sequence-number": 10,
+ "timestamp-ms": 1515100955770,
+ "summary": {
+ "operation": "append"
+ },
+ "manifest-list": "s3://b/wh/.../s1.avro",
+ "schema-id": 0
+ }
+ "#;
+
+ let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
+ let mut metadata = metadata;
+ metadata.append_snapshot(snapshot);
+
+ // Ref exists and should matches
+ let requirement = TableRequirement::RefSnapshotIdMatch {
+ r#ref: "main".to_string(),
+ snapshot_id: Some(3051729675574597004),
+ };
+ assert!(requirement.check(Some(&metadata)).is_ok());
+
+ // Ref exists but does not match
+ let requirement = TableRequirement::RefSnapshotIdMatch {
+ r#ref: "main".to_string(),
+ snapshot_id: Some(1),
+ };
+ assert!(requirement.check(Some(&metadata)).is_err());
+ }
+
+ #[test]
+ fn test_check_last_assigned_field_id() {
+ let metadata = metadata();
+
+ let requirement = TableRequirement::LastAssignedFieldIdMatch {
+ last_assigned_field_id: 1,
+ };
+ assert!(requirement.check(Some(&metadata)).is_err());
+
+ let requirement = TableRequirement::LastAssignedFieldIdMatch {
+ last_assigned_field_id: 0,
+ };
+ assert!(requirement.check(Some(&metadata)).is_ok());
+ }
+
+ #[test]
+ fn test_check_current_schema_id() {
+ let metadata = metadata();
+
+ let requirement = TableRequirement::CurrentSchemaIdMatch {
+ current_schema_id: 1,
+ };
+ assert!(requirement.check(Some(&metadata)).is_err());
+
+ let requirement = TableRequirement::CurrentSchemaIdMatch {
+ current_schema_id: 0,
+ };
+ assert!(requirement.check(Some(&metadata)).is_ok());
+ }
+
+ #[test]
+ fn test_check_last_assigned_partition_id() {
+ let metadata = metadata();
+
+ let requirement = TableRequirement::LastAssignedPartitionIdMatch {
+ last_assigned_partition_id: 1,
+ };
+ assert!(requirement.check(Some(&metadata)).is_err());
+
+ let requirement = TableRequirement::LastAssignedPartitionIdMatch {
+ last_assigned_partition_id: 0,
+ };
+ assert!(requirement.check(Some(&metadata)).is_ok());
+ }
+
+ #[test]
+ fn test_check_default_spec_id() {
+ let metadata = metadata();
+
+ let requirement = TableRequirement::DefaultSpecIdMatch {
default_spec_id: 1 };
+ assert!(requirement.check(Some(&metadata)).is_err());
+
+ let requirement = TableRequirement::DefaultSpecIdMatch {
default_spec_id: 0 };
+ assert!(requirement.check(Some(&metadata)).is_ok());
+ }
+
+ #[test]
+ fn test_check_default_sort_order_id() {
+ let metadata = metadata();
+
+ let requirement = TableRequirement::DefaultSortOrderIdMatch {
+ default_sort_order_id: 1,
+ };
+ assert!(requirement.check(Some(&metadata)).is_err());
+
+ let requirement = TableRequirement::DefaultSortOrderIdMatch {
+ default_sort_order_id: 0,
+ };
+ assert!(requirement.check(Some(&metadata)).is_ok());
+ }
+
#[test]
fn test_table_uuid() {
test_serde_json(
diff --git a/crates/iceberg/src/transaction.rs
b/crates/iceberg/src/transaction.rs
index db7c3f28..f29cf512 100644
--- a/crates/iceberg/src/transaction.rs
+++ b/crates/iceberg/src/transaction.rs
@@ -154,7 +154,7 @@ impl<'a> ReplaceSortOrderAction<'a> {
let requirements = vec![
TableRequirement::CurrentSchemaIdMatch {
- current_schema_id:
self.tx.table.metadata().current_schema().schema_id() as i64,
+ current_schema_id:
self.tx.table.metadata().current_schema().schema_id(),
},
TableRequirement::DefaultSortOrderIdMatch {
default_sort_order_id:
self.tx.table.metadata().default_sort_order().order_id,