This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 760b752a Raise concurrency errors properly for glue tables (#1875)
760b752a is described below

commit 760b752aeda1884dc429252e6cf6405d8fb51fa1
Author: Jem Bishop <[email protected]>
AuthorDate: Wed Nov 19 12:16:43 2025 +0000

    Raise concurrency errors properly for glue tables (#1875)
    
    ## Which issue does this PR close?
    #1868
    
    ## What changes are included in this PR?
    
    As referenced in the issue, the optimistic concurrency was not working
    correctly as we need to check the glue table version has not incremented
    before we make our update, so we can get back the error if there is a
    concurrent modification.
    
    This changes the `update_table` to properly handle this case.
    
    Also changed the `load_table` to `load_table_with_version_id`, and
    `load_table` now just uses that.
    
    <!--
    Provide a summary of the modifications in this PR. List the main changes
    such as new features, bug fixes, refactoring, or any other updates.
    -->
    
    ## Are these changes tested?
    
    I have tested them using my specific setup with concurrent writers to
    iceberg tables in aws glue, but not generically. Writing a test case
    doesn't seem feasible as one needs a glue table.
    <!--
    Specify what test covers (unit test, integration test, etc.).
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    ---------
    
    Co-authored-by: Jem Bishop <[email protected]>
---
 crates/catalog/glue/src/catalog.rs | 108 +++++++++++++++++++++++--------------
 1 file changed, 69 insertions(+), 39 deletions(-)

diff --git a/crates/catalog/glue/src/catalog.rs 
b/crates/catalog/glue/src/catalog.rs
index 4514f2d7..dce287ed 100644
--- a/crates/catalog/glue/src/catalog.rs
+++ b/crates/catalog/glue/src/catalog.rs
@@ -196,6 +196,62 @@ impl GlueCatalog {
     pub fn file_io(&self) -> FileIO {
         self.file_io.clone()
     }
+
+    /// Loads a table from the Glue Catalog along with its version_id for 
optimistic locking.
+    ///
+    /// # Returns
+    /// A `Result` wrapping a tuple of (`Table`, `Option<String>`) where the 
String is the version_id
+    /// from Glue that should be used for optimistic concurrency control when 
updating the table.
+    ///
+    /// # Errors
+    /// This function may return an error in several scenarios, including:
+    /// - Failure to validate the namespace.
+    /// - Failure to retrieve the table from the Glue Catalog.
+    /// - Absence of metadata location information in the table's properties.
+    /// - Issues reading or deserializing the table's metadata file.
+    async fn load_table_with_version_id(
+        &self,
+        table: &TableIdent,
+    ) -> Result<(Table, Option<String>)> {
+        let db_name = validate_namespace(table.namespace())?;
+        let table_name = table.name();
+
+        let builder = self
+            .client
+            .0
+            .get_table()
+            .database_name(&db_name)
+            .name(table_name);
+        let builder = with_catalog_id!(builder, self.config);
+
+        let glue_table_output = 
builder.send().await.map_err(from_aws_sdk_error)?;
+
+        let glue_table = glue_table_output.table().ok_or_else(|| {
+            Error::new(
+                ErrorKind::TableNotFound,
+                format!(
+                    "Table object for database: {db_name} and table: 
{table_name} does not exist"
+                ),
+            )
+        })?;
+
+        let version_id = glue_table.version_id.clone();
+        let metadata_location = get_metadata_location(&glue_table.parameters)?;
+
+        let metadata = TableMetadata::read_from(&self.file_io, 
&metadata_location).await?;
+
+        let table = Table::builder()
+            .file_io(self.file_io())
+            .metadata_location(metadata_location)
+            .metadata(metadata)
+            .identifier(TableIdent::new(
+                NamespaceIdent::new(db_name),
+                table_name.to_owned(),
+            ))
+            .build()?;
+
+        Ok((table, version_id))
+    }
 }
 
 #[async_trait]
@@ -514,42 +570,8 @@ impl Catalog for GlueCatalog {
     /// - Absence of metadata location information in the table's properties.
     /// - Issues reading or deserializing the table's metadata file.
     async fn load_table(&self, table: &TableIdent) -> Result<Table> {
-        let db_name = validate_namespace(table.namespace())?;
-        let table_name = table.name();
-
-        let builder = self
-            .client
-            .0
-            .get_table()
-            .database_name(&db_name)
-            .name(table_name);
-        let builder = with_catalog_id!(builder, self.config);
-
-        let glue_table_output = 
builder.send().await.map_err(from_aws_sdk_error)?;
-
-        match glue_table_output.table() {
-            None => Err(Error::new(
-                ErrorKind::TableNotFound,
-                format!(
-                    "Table object for database: {db_name} and table: 
{table_name} does not exist"
-                ),
-            )),
-            Some(table) => {
-                let metadata_location = 
get_metadata_location(&table.parameters)?;
-
-                let metadata = TableMetadata::read_from(&self.file_io, 
&metadata_location).await?;
-
-                Table::builder()
-                    .file_io(self.file_io())
-                    .metadata_location(metadata_location)
-                    .metadata(metadata)
-                    .identifier(TableIdent::new(
-                        NamespaceIdent::new(db_name),
-                        table_name.to_owned(),
-                    ))
-                    .build()
-            }
-        }
+        let (table, _) = self.load_table_with_version_id(table).await?;
+        Ok(table)
     }
 
     /// Asynchronously drops a table from the database.
@@ -761,7 +783,9 @@ impl Catalog for GlueCatalog {
     async fn update_table(&self, commit: TableCommit) -> Result<Table> {
         let table_ident = commit.identifier().clone();
         let table_namespace = validate_namespace(table_ident.namespace())?;
-        let current_table = self.load_table(&table_ident).await?;
+
+        let (current_table, current_version_id) =
+            self.load_table_with_version_id(&table_ident).await?;
         let current_metadata_location = 
current_table.metadata_location_result()?.to_string();
 
         let staged_table = commit.apply(current_table)?;
@@ -773,8 +797,8 @@ impl Catalog for GlueCatalog {
             .write_to(staged_table.file_io(), staged_metadata_location)
             .await?;
 
-        // Persist staged table to Glue
-        let builder = self
+        // Persist staged table to Glue with optimistic locking
+        let mut builder = self
             .client
             .0
             .update_table()
@@ -787,6 +811,12 @@ impl Catalog for GlueCatalog {
                 staged_table.metadata().properties(),
                 Some(current_metadata_location),
             )?);
+
+        // Add VersionId for optimistic locking
+        if let Some(version_id) = current_version_id {
+            builder = builder.version_id(version_id);
+        }
+
         let builder = with_catalog_id!(builder, self.config);
         let _ = builder.send().await.map_err(|e| {
             let error = e.into_service_error();

Reply via email to