This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 87acf3cbb feat(catalog): Implement update_table for S3TablesCatalog 
(#1594)
87acf3cbb is described below

commit 87acf3cbb77ecd5c0ed3e1b2eb4e8327223bc706
Author: Shawn Chang <[email protected]>
AuthorDate: Tue Oct 28 10:34:01 2025 -0700

    feat(catalog): Implement update_table for S3TablesCatalog (#1594)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Addresses the S3Table part of #1389
    
    ## What changes are included in this PR?
    - Implemented update_table for S3TablesCatalog
    <!--
    Provide a summary of the modifications in this PR. List the main changes
    such as new features, bug fixes, refactoring, or any other updates.
    -->
    
    ## Are these changes tested?
    added a test
    <!--
    Specify what test covers (unit test, integration test, etc.).
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
    Co-authored-by: Xuanwo <[email protected]>
---
 Cargo.lock                             |   1 +
 crates/catalog/s3tables/Cargo.toml     |   1 +
 crates/catalog/s3tables/src/catalog.rs | 199 +++++++++++++++++++++++++++------
 3 files changed, 167 insertions(+), 34 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index f4085e59e..808c222dd 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3743,6 +3743,7 @@ dependencies = [
 name = "iceberg-catalog-s3tables"
 version = "0.7.0"
 dependencies = [
+ "anyhow",
  "async-trait",
  "aws-config",
  "aws-sdk-s3tables",
diff --git a/crates/catalog/s3tables/Cargo.toml 
b/crates/catalog/s3tables/Cargo.toml
index c5df48969..66fb70fef 100644
--- a/crates/catalog/s3tables/Cargo.toml
+++ b/crates/catalog/s3tables/Cargo.toml
@@ -29,6 +29,7 @@ license = { workspace = true }
 repository = { workspace = true }
 
 [dependencies]
+anyhow = { workspace = true }
 async-trait = { workspace = true }
 aws-config = { workspace = true }
 aws-sdk-s3tables = { workspace = true }
diff --git a/crates/catalog/s3tables/src/catalog.rs 
b/crates/catalog/s3tables/src/catalog.rs
index 858bb9289..daa659055 100644
--- a/crates/catalog/s3tables/src/catalog.rs
+++ b/crates/catalog/s3tables/src/catalog.rs
@@ -23,6 +23,7 @@ use 
aws_sdk_s3tables::operation::create_table::CreateTableOutput;
 use aws_sdk_s3tables::operation::get_namespace::GetNamespaceOutput;
 use aws_sdk_s3tables::operation::get_table::GetTableOutput;
 use aws_sdk_s3tables::operation::list_tables::ListTablesOutput;
+use 
aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError;
 use aws_sdk_s3tables::types::OpenTableFormat;
 use iceberg::io::{FileIO, FileIOBuilder};
 use iceberg::spec::{TableMetadata, TableMetadataBuilder};
@@ -189,6 +190,39 @@ impl S3TablesCatalog {
             file_io,
         })
     }
+
+    async fn load_table_with_version_token(
+        &self,
+        table_ident: &TableIdent,
+    ) -> Result<(Table, String)> {
+        let req = self
+            .s3tables_client
+            .get_table()
+            .table_bucket_arn(self.config.table_bucket_arn.clone())
+            .namespace(table_ident.namespace().to_url_string())
+            .name(table_ident.name());
+        let resp: GetTableOutput = 
req.send().await.map_err(from_aws_sdk_error)?;
+
+        // when a table is created, it's possible that the metadata location 
is not set.
+        let metadata_location = resp.metadata_location().ok_or_else(|| {
+            Error::new(
+                ErrorKind::Unexpected,
+                format!(
+                    "Table {} does not have metadata location",
+                    table_ident.name()
+                ),
+            )
+        })?;
+        let metadata = TableMetadata::read_from(&self.file_io, 
metadata_location).await?;
+
+        let table = Table::builder()
+            .identifier(table_ident.clone())
+            .metadata(metadata)
+            .metadata_location(metadata_location)
+            .file_io(self.file_io.clone())
+            .build()?;
+        Ok((table, resp.version_token))
+    }
 }
 
 #[async_trait]
@@ -477,33 +511,7 @@ impl Catalog for S3TablesCatalog {
     /// - Errors from the underlying database query process, converted using
     /// `from_aws_sdk_error`.
     async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
-        let req = self
-            .s3tables_client
-            .get_table()
-            .table_bucket_arn(self.config.table_bucket_arn.clone())
-            .namespace(table_ident.namespace().to_url_string())
-            .name(table_ident.name());
-        let resp: GetTableOutput = 
req.send().await.map_err(from_aws_sdk_error)?;
-
-        // when a table is created, it's possible that the metadata location 
is not set.
-        let metadata_location = resp.metadata_location().ok_or_else(|| {
-            Error::new(
-                ErrorKind::Unexpected,
-                format!(
-                    "Table {} does not have metadata location",
-                    table_ident.name()
-                ),
-            )
-        })?;
-        let metadata = TableMetadata::read_from(&self.file_io, 
metadata_location).await?;
-
-        let table = Table::builder()
-            .identifier(table_ident.clone())
-            .metadata(metadata)
-            .metadata_location(metadata_location)
-            .file_io(self.file_io.clone())
-            .build()?;
-        Ok(table)
+        Ok(self.load_table_with_version_token(table_ident).await?.0)
     }
 
     /// Drops an existing table from the s3tables catalog.
@@ -589,13 +597,50 @@ impl Catalog for S3TablesCatalog {
     }
 
     /// Updates an existing table within the s3tables catalog.
-    ///
-    /// This function is still in development and will always return an error.
-    async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
-        Err(Error::new(
-            ErrorKind::FeatureUnsupported,
-            "Updating a table is not supported yet",
-        ))
+    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
+        let table_ident = commit.identifier().clone();
+        let table_namespace = table_ident.namespace();
+        let (current_table, version_token) =
+            self.load_table_with_version_token(&table_ident).await?;
+
+        let staged_table = commit.apply(current_table)?;
+        let staged_metadata_location = 
staged_table.metadata_location_result()?;
+
+        staged_table
+            .metadata()
+            .write_to(staged_table.file_io(), staged_metadata_location)
+            .await?;
+
+        let builder = self
+            .s3tables_client
+            .update_table_metadata_location()
+            .table_bucket_arn(&self.config.table_bucket_arn)
+            .namespace(table_namespace.to_url_string())
+            .name(table_ident.name())
+            .version_token(version_token)
+            .metadata_location(staged_metadata_location);
+
+        let _ = builder.send().await.map_err(|e| {
+            let error = e.into_service_error();
+            match error {
+                UpdateTableMetadataLocationError::ConflictException(_) => 
Error::new(
+                    ErrorKind::CatalogCommitConflicts,
+                    format!("Commit conflicted for table: {table_ident}"),
+                )
+                .with_retryable(true),
+                UpdateTableMetadataLocationError::NotFoundException(_) => 
Error::new(
+                    ErrorKind::TableNotFound,
+                    format!("Table {table_ident} is not found"),
+                ),
+                _ => Error::new(
+                    ErrorKind::Unexpected,
+                    "Operation failed for hitting aws sdk error",
+                ),
+            }
+            .with_source(anyhow::Error::msg(format!("aws sdk error: {:?}", 
error)))
+        })?;
+
+        Ok(staged_table)
     }
 }
 
@@ -611,6 +656,7 @@ where T: std::fmt::Debug {
 #[cfg(test)]
 mod tests {
     use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
+    use iceberg::transaction::{ApplyTransactionAction, Transaction};
 
     use super::*;
 
@@ -737,6 +783,91 @@ mod tests {
         catalog.drop_namespace(&namespace).await.unwrap();
     }
 
+    #[tokio::test]
+    async fn test_s3tables_update_table() {
+        let catalog = match load_s3tables_catalog_from_env().await {
+            Ok(Some(catalog)) => catalog,
+            Ok(None) => return,
+            Err(e) => panic!("Error loading catalog: {}", e),
+        };
+
+        // Create a test namespace and table
+        let namespace = 
NamespaceIdent::new("test_s3tables_update_table".to_string());
+        let table_ident =
+            TableIdent::new(namespace.clone(), 
"test_s3tables_update_table".to_string());
+
+        // Clean up any existing resources from previous test runs
+        catalog.drop_table(&table_ident).await.ok();
+        catalog.drop_namespace(&namespace).await.ok();
+
+        // Create namespace and table
+        catalog
+            .create_namespace(&namespace, HashMap::new())
+            .await
+            .unwrap();
+
+        let creation = {
+            let schema = Schema::builder()
+                .with_schema_id(0)
+                .with_fields(vec![
+                    NestedField::required(1, "foo", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::required(2, "bar", 
Type::Primitive(PrimitiveType::String)).into(),
+                ])
+                .build()
+                .unwrap();
+            TableCreation::builder()
+                .name(table_ident.name().to_string())
+                .properties(HashMap::new())
+                .schema(schema)
+                .build()
+        };
+
+        let table = catalog.create_table(&namespace, creation).await.unwrap();
+
+        // Create a transaction to update the table
+        let tx = Transaction::new(&table);
+
+        // Store the original metadata location for comparison
+        let original_metadata_location = table.metadata_location();
+
+        // Update table properties using the transaction
+        let tx = tx
+            .update_table_properties()
+            .set("test_property".to_string(), "test_value".to_string())
+            .apply(tx)
+            .unwrap();
+
+        // Commit the transaction to the catalog
+        let updated_table = tx.commit(&catalog).await.unwrap();
+
+        // Verify the update was successful
+        assert_eq!(
+            updated_table.metadata().properties().get("test_property"),
+            Some(&"test_value".to_string())
+        );
+
+        // Verify the metadata location has been updated
+        assert_ne!(
+            updated_table.metadata_location(),
+            original_metadata_location,
+            "Metadata location should be updated after commit"
+        );
+
+        // Load the table again from the catalog to verify changes were 
persisted
+        let reloaded_table = catalog.load_table(&table_ident).await.unwrap();
+
+        // Verify the reloaded table matches the updated table
+        assert_eq!(
+            reloaded_table.metadata().properties().get("test_property"),
+            Some(&"test_value".to_string())
+        );
+        assert_eq!(
+            reloaded_table.metadata_location(),
+            updated_table.metadata_location(),
+            "Reloaded table should have the same metadata location as the 
updated table"
+        );
+    }
+
     #[tokio::test]
     async fn test_builder_load_missing_bucket_arn() {
         let builder = S3TablesCatalogBuilder::default();

Reply via email to