This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new bdb44ae58 feat(catalog): Implement update_table for SqlCatalog (#1911)
bdb44ae58 is described below
commit bdb44ae58a45ec12a34af28037901900d6020e08
Author: Landon Gingerich <[email protected]>
AuthorDate: Wed Dec 10 04:21:38 2025 -0600
feat(catalog): Implement update_table for SqlCatalog (#1911)
## Which issue does this PR close?
- Closes the SQL catalog part of
https://github.com/apache/iceberg-rust/issues/1389
## What changes are included in this PR?
- Implement `update_table()` for SQL catalog
- Add corresponding `test_update_table` test
## Are these changes tested?
Yes. Covered by new `test_update_table` test.
---
crates/catalog/sql/src/catalog.rs | 107 ++++++++++++++++++++++++++++++++++++--
1 file changed, 102 insertions(+), 5 deletions(-)
diff --git a/crates/catalog/sql/src/catalog.rs
b/crates/catalog/sql/src/catalog.rs
index 77b35a228..8209cd04c 100644
--- a/crates/catalog/sql/src/catalog.rs
+++ b/crates/catalog/sql/src/catalog.rs
@@ -917,11 +917,55 @@ impl Catalog for SqlCatalog {
.build()?)
}
- async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
- Err(Error::new(
- ErrorKind::FeatureUnsupported,
- "Updating a table is not supported yet",
- ))
+ /// Updates an existing table within the SQL catalog.
+ async fn update_table(&self, commit: TableCommit) -> Result<Table> {
+ let table_ident = commit.identifier().clone();
+ let current_table = self.load_table(&table_ident).await?;
+ let current_metadata_location =
current_table.metadata_location_result()?.to_string();
+
+ let staged_table = commit.apply(current_table)?;
+ let staged_metadata_location =
staged_table.metadata_location_result()?;
+
+ staged_table
+ .metadata()
+ .write_to(staged_table.file_io(), &staged_metadata_location)
+ .await?;
+
+ let update_result = self
+ .execute(
+ &format!(
+ "UPDATE {CATALOG_TABLE_NAME}
+ SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?,
{CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ?
+ WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+ AND {CATALOG_FIELD_TABLE_NAME} = ?
+ AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
+ AND (
+ {CATALOG_FIELD_RECORD_TYPE} =
'{CATALOG_FIELD_TABLE_RECORD_TYPE}'
+ OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
+ )
+ AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?"
+ ),
+ vec![
+ Some(staged_metadata_location),
+ Some(current_metadata_location.as_str()),
+ Some(&self.name),
+ Some(table_ident.name()),
+ Some(&table_ident.namespace().join(".")),
+ Some(current_metadata_location.as_str()),
+ ],
+ None,
+ )
+ .await?;
+
+ if update_result.rows_affected() == 0 {
+ return Err(Error::new(
+ ErrorKind::CatalogCommitConflicts,
+ format!("Commit conflicted for table: {table_ident}"),
+ )
+ .with_retryable(true));
+ }
+
+ Ok(staged_table)
}
}
@@ -932,6 +976,7 @@ mod tests {
use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema,
SortOrder, Type};
use iceberg::table::Table;
+ use iceberg::transaction::{ApplyTransactionAction, Transaction};
use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent,
TableCreation, TableIdent};
use itertools::Itertools;
use regex::Regex;
@@ -2293,4 +2338,56 @@ mod tests {
assert_eq!(table.identifier(), expected_table.identifier());
assert_eq!(table.metadata_location(),
Some(metadata_location.as_str()));
}
+
+ #[tokio::test]
+ async fn test_update_table() {
+ let warehouse_loc = temp_path();
+ let catalog = new_sql_catalog(warehouse_loc).await;
+
+ // Create a test namespace and table
+ let namespace_ident = NamespaceIdent::new("ns1".into());
+ create_namespace(&catalog, &namespace_ident).await;
+ let table_ident = TableIdent::new(namespace_ident.clone(),
"tbl1".into());
+ create_table(&catalog, &table_ident).await;
+
+ let table = catalog.load_table(&table_ident).await.unwrap();
+
+ // Store the original metadata location for comparison
+ let original_metadata_location =
table.metadata_location().unwrap().to_string();
+
+ // Create a transaction to update the table
+ let tx = Transaction::new(&table);
+ let tx = tx
+ .update_table_properties()
+ .set("test_property".to_string(), "test_value".to_string())
+ .apply(tx)
+ .unwrap();
+
+ // Commit the transaction to the catalog
+ let updated_table = tx.commit(&catalog).await.unwrap();
+
+ // Verify the update was successful
+ assert_eq!(
+ updated_table.metadata().properties().get("test_property"),
+ Some(&"test_value".to_string())
+ );
+ // Verify the metadata location has been updated
+ assert_ne!(
+ updated_table.metadata_location().unwrap(),
+ original_metadata_location.as_str()
+ );
+
+ // Load the table again from the catalog to verify changes were
persisted
+ let reloaded = catalog.load_table(&table_ident).await.unwrap();
+
+ // Verify the reloaded table matches the updated table
+ assert_eq!(
+ reloaded.metadata().properties().get("test_property"),
+ Some(&"test_value".to_string())
+ );
+ assert_eq!(
+ reloaded.metadata_location(),
+ updated_table.metadata_location()
+ );
+ }
}