This is an automated email from the ASF dual-hosted git repository.

dru pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 536a7d07 fix(glue): Support create_table for S3 Tables federated 
databases (#3058)
536a7d07 is described below

commit 536a7d07100251fbe3f1074b9f5e7cf82a548d02
Author: James Bornholt <[email protected]>
AuthorDate: Thu Mar 19 17:08:12 2026 -0700

    fix(glue): Support create_table for S3 Tables federated databases (#3058)
---
 pyiceberg/catalog/glue.py  | 154 +++++++++++++++++++++++++++++++++++++++++++--
 tests/catalog/test_glue.py | 128 ++++++++++++++++++++++++++++++++++++-
 2 files changed, 275 insertions(+), 7 deletions(-)

diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py
index 83898d01..83c06c34 100644
--- a/pyiceberg/catalog/glue.py
+++ b/pyiceberg/catalog/glue.py
@@ -16,6 +16,7 @@
 #  under the License.
 
 
+import logging
 from typing import (
     TYPE_CHECKING,
     Any,
@@ -48,10 +49,10 @@ from pyiceberg.exceptions import (
     NoSuchTableError,
     TableAlreadyExistsError,
 )
-from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_PROFILE_NAME, AWS_REGION, 
AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
+from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_PROFILE_NAME, AWS_REGION, 
AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, FileIO
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
 from pyiceberg.schema import Schema, SchemaVisitor, visit
-from pyiceberg.serializers import FromInputFile
+from pyiceberg.serializers import FromInputFile, ToOutputFile
 from pyiceberg.table import (
     CommitTableResponse,
     Table,
@@ -122,6 +123,8 @@ ICEBERG_FIELD_ID = "iceberg.field.id"
 ICEBERG_FIELD_OPTIONAL = "iceberg.field.optional"
 ICEBERG_FIELD_CURRENT = "iceberg.field.current"
 
+logger = logging.getLogger(__name__)
+
 GLUE_PROFILE_NAME = "glue.profile-name"
 GLUE_REGION = "glue.region"
 GLUE_ACCESS_KEY_ID = "glue.access-key-id"
@@ -129,6 +132,7 @@ GLUE_SECRET_ACCESS_KEY = "glue.secret-access-key"
 GLUE_SESSION_TOKEN = "glue.session-token"
 GLUE_MAX_RETRIES = "glue.max-retries"
 GLUE_RETRY_MODE = "glue.retry-mode"
+GLUE_CONNECTION_S3_TABLES = "aws:s3tables"
 
 MAX_RETRIES = 10
 STANDARD_RETRY_MODE = "standard"
@@ -419,6 +423,121 @@ class GlueCatalog(MetastoreCatalog):
         except self.glue.exceptions.EntityNotFoundException as e:
             raise NoSuchTableError(f"Table does not exist: 
{database_name}.{table_name}") from e
 
+    def _is_s3tables_database(self, database_name: str) -> bool:
+        """Check if a Glue database is federated with S3 Tables.
+
+        S3 Tables databases have a FederatedDatabase property with
+        ConnectionType set to aws:s3tables.
+
+        Args:
+            database_name: The name of the Glue database.
+
+        Returns:
+            True if the database is an S3 Tables federated database.
+        """
+        try:
+            database_response = self.glue.get_database(Name=database_name)
+        except self.glue.exceptions.EntityNotFoundException:
+            return False
+        database = database_response["Database"]
+        federated = database.get("FederatedDatabase", {})
+        return federated.get("ConnectionType", "") == GLUE_CONNECTION_S3_TABLES
+
+    @staticmethod
+    def _write_metadata_no_exist_check(metadata: TableMetadata, io: FileIO, 
metadata_path: str) -> None:
+        ToOutputFile.table_metadata(metadata, io.new_output(metadata_path), 
overwrite=True)
+
+    def _create_table_s3tables(
+        self,
+        identifier: str | Identifier,
+        schema: Union[Schema, "pa.Schema"],
+        location: str | None,
+        partition_spec: PartitionSpec,
+        sort_order: SortOrder,
+        properties: Properties,
+    ) -> Table:
+        """Create an Iceberg table in an S3 Tables federated database.
+
+        S3 Tables manages storage internally, so the table location is not 
known until the
+        table is created in the service. This method:
+          1. Creates a minimal table entry in Glue (format=ICEBERG), which 
causes S3 Tables
+             to allocate storage.
+          2. Retrieves the managed storage location via GetTable.
+          3. Writes Iceberg metadata to that location.
+          4. Updates the Glue table entry with the metadata pointer.
+
+        On failure, the table created in step 1 is deleted.
+        """
+        database_name, table_name = 
self.identifier_to_database_and_table(identifier)
+
+        if location is not None:
+            raise ValueError(
+                f"Cannot specify a location for S3 Tables table 
{database_name}.{table_name}. "
+                "S3 Tables manages the storage location automatically."
+            )
+
+        # Create a minimal table in Glue so S3 Tables allocates storage.
+        self._create_glue_table(
+            database_name=database_name,
+            table_name=table_name,
+            table_input={
+                "Name": table_name,
+                "Parameters": {"format": "ICEBERG"},
+            },
+        )
+
+        try:
+            # Retrieve the managed storage location.
+            glue_table = self._get_glue_table(database_name=database_name, 
table_name=table_name)
+            storage_descriptor = glue_table.get("StorageDescriptor", {})
+            managed_location = storage_descriptor.get("Location")
+            if not managed_location:
+                raise ValueError(f"S3 Tables did not assign a storage location 
for {database_name}.{table_name}")
+
+            # Build the Iceberg metadata targeting the managed location.
+            staged_table = self._create_staged_table(
+                identifier=identifier,
+                schema=schema,
+                location=managed_location,
+                partition_spec=partition_spec,
+                sort_order=sort_order,
+                properties=properties,
+            )
+
+            # Write metadata and update the Glue table with the metadata 
pointer.
+            # Skip the exist check before writing; S3 Tables doesn't support 
ListObjectsV2.
+            self._write_metadata_no_exist_check(staged_table.metadata, 
staged_table.io, staged_table.metadata_location)
+            table_input = _construct_table_input(table_name, 
staged_table.metadata_location, properties, staged_table.metadata)
+            version_id = glue_table.get("VersionId")
+            if not version_id:
+                raise CommitFailedException(
+                    f"Cannot commit {database_name}.{table_name} because Glue 
table version id is missing"
+                )
+            self._update_glue_table(
+                database_name=database_name,
+                table_name=table_name,
+                table_input=table_input,
+                version_id=version_id,
+            )
+        except Exception:
+            # Clean up the table created in step 1.
+            try:
+                self.glue.delete_table(DatabaseName=database_name, 
Name=table_name)
+            except Exception:
+                logger.warning(
+                    f"Failed to clean up S3 Tables table 
{database_name}.{table_name}",
+                    exc_info=logger.isEnabledFor(logging.DEBUG),
+                )
+            raise
+
+        return Table(
+            identifier=self.identifier_to_tuple(identifier),
+            metadata=staged_table.metadata,
+            metadata_location=staged_table.metadata_location,
+            io=self._load_file_io(staged_table.metadata.properties, 
staged_table.metadata_location),
+            catalog=self,
+        )
+
     def create_table(
         self,
         identifier: str | Identifier,
@@ -435,6 +554,7 @@ class GlueCatalog(MetastoreCatalog):
             identifier: Table identifier.
             schema: Table's schema.
             location: Location for the table. Optional Argument.
+                Must not be set for S3 Tables, which manage their own storage.
             partition_spec: PartitionSpec for the table.
             sort_order: SortOrder for the table.
             properties: Table properties that can be a string based dictionary.
@@ -444,9 +564,22 @@ class GlueCatalog(MetastoreCatalog):
 
         Raises:
             AlreadyExistsError: If a table with the name already exists.
-            ValueError: If the identifier is invalid, or no path is given to 
store metadata.
+            ValueError: If the identifier is invalid, no path is given to 
store metadata,
+                or a location is specified for an S3 Tables table.
 
         """
+        database_name, table_name = 
self.identifier_to_database_and_table(identifier)
+
+        if self._is_s3tables_database(database_name):
+            return self._create_table_s3tables(
+                identifier=identifier,
+                schema=schema,
+                location=location,
+                partition_spec=partition_spec,
+                sort_order=sort_order,
+                properties=properties,
+            )
+
         staged_table = self._create_staged_table(
             identifier=identifier,
             schema=schema,
@@ -455,13 +588,18 @@ class GlueCatalog(MetastoreCatalog):
             sort_order=sort_order,
             properties=properties,
         )
-        database_name, table_name = 
self.identifier_to_database_and_table(identifier)
 
         self._write_metadata(staged_table.metadata, staged_table.io, 
staged_table.metadata_location)
         table_input = _construct_table_input(table_name, 
staged_table.metadata_location, properties, staged_table.metadata)
         self._create_glue_table(database_name=database_name, 
table_name=table_name, table_input=table_input)
 
-        return self.load_table(identifier=identifier)
+        return Table(
+            identifier=self.identifier_to_tuple(identifier),
+            metadata=staged_table.metadata,
+            metadata_location=staged_table.metadata_location,
+            io=self._load_file_io(staged_table.metadata.properties, 
staged_table.metadata_location),
+            catalog=self,
+        )
 
     def register_table(self, identifier: str | Identifier, metadata_location: 
str) -> Table:
         """Register a new table using existing metadata.
@@ -521,7 +659,11 @@ class GlueCatalog(MetastoreCatalog):
         if current_table and updated_staged_table.metadata == 
current_table.metadata:
             # no changes, do nothing
             return CommitTableResponse(metadata=current_table.metadata, 
metadata_location=current_table.metadata_location)
-        self._write_metadata(
+        # S3 Tables managed storage doesn't support ListObjectsV2, so skip the 
exist check.
+        write_metadata = (
+            self._write_metadata_no_exist_check if 
self._is_s3tables_database(database_name) else self._write_metadata
+        )
+        write_metadata(
             metadata=updated_staged_table.metadata,
             io=updated_staged_table.io,
             metadata_path=updated_staged_table.metadata_location,
diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py
index 5273db22..c8da49a8 100644
--- a/tests/catalog/test_glue.py
+++ b/tests/catalog/test_glue.py
@@ -21,7 +21,7 @@ import pyarrow as pa
 import pytest
 from moto import mock_aws
 
-from pyiceberg.catalog.glue import GlueCatalog
+from pyiceberg.catalog.glue import GLUE_CONNECTION_S3_TABLES, GlueCatalog
 from pyiceberg.exceptions import (
     NamespaceAlreadyExistsError,
     NamespaceNotEmptyError,
@@ -43,6 +43,57 @@ from tests.conftest import (
     UNIFIED_AWS_SESSION_PROPERTIES,
 )
 
+S3TABLES_WAREHOUSE_LOCATION = "s3tables-warehouse-location"
+
+
+def _patch_moto_for_s3tables(monkeypatch: pytest.MonkeyPatch) -> None:
+    """Patch moto to simulate S3 Tables federated databases.
+
+    Moto does not support FederatedDatabase on GetDatabase responses or
+    auto-populating StorageDescriptor.Location for S3 Tables. These patches
+    simulate the S3 Tables service behavior so that the GlueCatalog S3 Tables
+    code path can be tested end-to-end with moto.
+    """
+    from moto.glue.models import FakeDatabase, FakeTable
+
+    # Patch 1: Make GetDatabase return FederatedDatabase from the stored input.
+    _original_db_as_dict = FakeDatabase.as_dict
+
+    def _db_as_dict_with_federated(self):  # type: ignore
+        result = _original_db_as_dict(self)
+        if federated := self.input.get("FederatedDatabase"):
+            result["FederatedDatabase"] = federated
+        return result
+
+    monkeypatch.setattr(FakeDatabase, "as_dict", _db_as_dict_with_federated)
+
+    # Patch 2: When a table is created with format=ICEBERG (the S3 Tables 
convention),
+    # inject a StorageDescriptor.Location to simulate S3 Tables vending a table
+    # warehouse location.
+    _original_table_init = FakeTable.__init__
+
+    def _table_init_with_location(self, database_name, table_name, 
table_input, catalog_id):  # type: ignore
+        if table_input.get("Parameters", {}).get("format") == "ICEBERG" and 
"StorageDescriptor" not in table_input:
+            table_input = {
+                **table_input,
+                "StorageDescriptor": {
+                    "Columns": [],
+                    "Location": 
f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}/",
+                    "InputFormat": "",
+                    "OutputFormat": "",
+                    "SerdeInfo": {},
+                },
+            }
+        _original_table_init(self, database_name, table_name, table_input, 
catalog_id)
+
+    monkeypatch.setattr(FakeTable, "__init__", _table_init_with_location)
+
+    # Create a bucket backing the simulated table warehouse location. S3 
Tables manages
+    # this storage internally, but in tests moto needs a real bucket for 
metadata file
+    # writes to succeed.
+    s3 = boto3.client("s3", region_name="us-east-1")
+    s3.create_bucket(Bucket=S3TABLES_WAREHOUSE_LOCATION)
+
 
 @mock_aws
 def test_create_table_with_database_location(
@@ -953,3 +1004,78 @@ def test_glue_client_override() -> None:
     test_client = boto3.client("glue", region_name="us-west-2")
     test_catalog = GlueCatalog(catalog_name, test_client)
     assert test_catalog.glue is test_client
+
+
+def _create_s3tables_database(catalog: GlueCatalog, database_name: str) -> 
None:
+    """Create a Glue database with S3 Tables federation metadata."""
+    catalog.glue.create_database(
+        DatabaseInput={
+            "Name": database_name,
+            "FederatedDatabase": {
+                "Identifier": 
"arn:aws:s3tables:us-east-1:123456789012:bucket/my-bucket",
+                "ConnectionType": GLUE_CONNECTION_S3_TABLES,
+            },
+        }
+    )
+
+
+@mock_aws
+def test_create_table_s3tables(
+    monkeypatch: pytest.MonkeyPatch,
+    _bucket_initialize: None,
+    moto_endpoint_url: str,
+    table_schema_nested: Schema,
+    database_name: str,
+    table_name: str,
+) -> None:
+    _patch_moto_for_s3tables(monkeypatch)
+
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": 
moto_endpoint_url})
+    _create_s3tables_database(test_catalog, database_name)
+
+    table = test_catalog.create_table(identifier, table_schema_nested)
+    assert table.name() == identifier
+    assert table.location() == 
f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}"
+    assert 
table.metadata_location.startswith(f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}/metadata/00000-")
+    assert table.metadata_location.endswith(".metadata.json")
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 0
+
+
+@mock_aws
+def test_create_table_s3tables_rejects_location(
+    monkeypatch: pytest.MonkeyPatch,
+    _bucket_initialize: None,
+    moto_endpoint_url: str,
+    table_schema_nested: Schema,
+    database_name: str,
+    table_name: str,
+) -> None:
+    _patch_moto_for_s3tables(monkeypatch)
+
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": 
moto_endpoint_url})
+    _create_s3tables_database(test_catalog, database_name)
+
+    with pytest.raises(ValueError, match="Cannot specify a location for S3 
Tables table"):
+        test_catalog.create_table(identifier, table_schema_nested, 
location="s3://some-bucket/some-path")
+
+
+@mock_aws
+def test_create_table_s3tables_duplicate(
+    monkeypatch: pytest.MonkeyPatch,
+    _bucket_initialize: None,
+    moto_endpoint_url: str,
+    table_schema_nested: Schema,
+    database_name: str,
+    table_name: str,
+) -> None:
+    _patch_moto_for_s3tables(monkeypatch)
+
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": 
moto_endpoint_url})
+    _create_s3tables_database(test_catalog, database_name)
+
+    test_catalog.create_table(identifier, table_schema_nested)
+    with pytest.raises(TableAlreadyExistsError):
+        test_catalog.create_table(identifier, table_schema_nested)

Reply via email to