This is an automated email from the ASF dual-hosted git repository.
dru pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 536a7d07 fix(glue): Support create_table for S3 Tables federated
databases (#3058)
536a7d07 is described below
commit 536a7d07100251fbe3f1074b9f5e7cf82a548d02
Author: James Bornholt <[email protected]>
AuthorDate: Thu Mar 19 17:08:12 2026 -0700
fix(glue): Support create_table for S3 Tables federated databases (#3058)
---
pyiceberg/catalog/glue.py | 154 +++++++++++++++++++++++++++++++++++++++++++--
tests/catalog/test_glue.py | 128 ++++++++++++++++++++++++++++++++++++-
2 files changed, 275 insertions(+), 7 deletions(-)
diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py
index 83898d01..83c06c34 100644
--- a/pyiceberg/catalog/glue.py
+++ b/pyiceberg/catalog/glue.py
@@ -16,6 +16,7 @@
# under the License.
+import logging
from typing import (
TYPE_CHECKING,
Any,
@@ -48,10 +49,10 @@ from pyiceberg.exceptions import (
NoSuchTableError,
TableAlreadyExistsError,
)
-from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_PROFILE_NAME, AWS_REGION,
AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
+from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_PROFILE_NAME, AWS_REGION,
AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, FileIO
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
-from pyiceberg.serializers import FromInputFile
+from pyiceberg.serializers import FromInputFile, ToOutputFile
from pyiceberg.table import (
CommitTableResponse,
Table,
@@ -122,6 +123,8 @@ ICEBERG_FIELD_ID = "iceberg.field.id"
ICEBERG_FIELD_OPTIONAL = "iceberg.field.optional"
ICEBERG_FIELD_CURRENT = "iceberg.field.current"
+logger = logging.getLogger(__name__)
+
GLUE_PROFILE_NAME = "glue.profile-name"
GLUE_REGION = "glue.region"
GLUE_ACCESS_KEY_ID = "glue.access-key-id"
@@ -129,6 +132,7 @@ GLUE_SECRET_ACCESS_KEY = "glue.secret-access-key"
GLUE_SESSION_TOKEN = "glue.session-token"
GLUE_MAX_RETRIES = "glue.max-retries"
GLUE_RETRY_MODE = "glue.retry-mode"
+GLUE_CONNECTION_S3_TABLES = "aws:s3tables"
MAX_RETRIES = 10
STANDARD_RETRY_MODE = "standard"
@@ -419,6 +423,121 @@ class GlueCatalog(MetastoreCatalog):
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchTableError(f"Table does not exist:
{database_name}.{table_name}") from e
+ def _is_s3tables_database(self, database_name: str) -> bool:
+ """Check if a Glue database is federated with S3 Tables.
+
+ S3 Tables databases have a FederatedDatabase property with
+ ConnectionType set to aws:s3tables.
+
+ Args:
+ database_name: The name of the Glue database.
+
+ Returns:
+ True if the database is an S3 Tables federated database.
+ """
+ try:
+ database_response = self.glue.get_database(Name=database_name)
+ except self.glue.exceptions.EntityNotFoundException:
+ return False
+ database = database_response["Database"]
+ federated = database.get("FederatedDatabase", {})
+ return federated.get("ConnectionType", "") == GLUE_CONNECTION_S3_TABLES
+
+ @staticmethod
+ def _write_metadata_no_exist_check(metadata: TableMetadata, io: FileIO,
metadata_path: str) -> None:
+ ToOutputFile.table_metadata(metadata, io.new_output(metadata_path),
overwrite=True)
+
+ def _create_table_s3tables(
+ self,
+ identifier: str | Identifier,
+ schema: Union[Schema, "pa.Schema"],
+ location: str | None,
+ partition_spec: PartitionSpec,
+ sort_order: SortOrder,
+ properties: Properties,
+ ) -> Table:
+ """Create an Iceberg table in an S3 Tables federated database.
+
+ S3 Tables manages storage internally, so the table location is not
known until the
+ table is created in the service. This method:
+ 1. Creates a minimal table entry in Glue (format=ICEBERG), which
causes S3 Tables
+ to allocate storage.
+ 2. Retrieves the managed storage location via GetTable.
+ 3. Writes Iceberg metadata to that location.
+ 4. Updates the Glue table entry with the metadata pointer.
+
+ On failure, the table created in step 1 is deleted.
+ """
+ database_name, table_name =
self.identifier_to_database_and_table(identifier)
+
+ if location is not None:
+ raise ValueError(
+ f"Cannot specify a location for S3 Tables table
{database_name}.{table_name}. "
+ "S3 Tables manages the storage location automatically."
+ )
+
+ # Create a minimal table in Glue so S3 Tables allocates storage.
+ self._create_glue_table(
+ database_name=database_name,
+ table_name=table_name,
+ table_input={
+ "Name": table_name,
+ "Parameters": {"format": "ICEBERG"},
+ },
+ )
+
+ try:
+ # Retrieve the managed storage location.
+ glue_table = self._get_glue_table(database_name=database_name,
table_name=table_name)
+ storage_descriptor = glue_table.get("StorageDescriptor", {})
+ managed_location = storage_descriptor.get("Location")
+ if not managed_location:
+ raise ValueError(f"S3 Tables did not assign a storage location
for {database_name}.{table_name}")
+
+ # Build the Iceberg metadata targeting the managed location.
+ staged_table = self._create_staged_table(
+ identifier=identifier,
+ schema=schema,
+ location=managed_location,
+ partition_spec=partition_spec,
+ sort_order=sort_order,
+ properties=properties,
+ )
+
+ # Write metadata and update the Glue table with the metadata
pointer.
+ # Skip the exist check before writing; S3 Tables doesn't support
ListObjectsV2.
+ self._write_metadata_no_exist_check(staged_table.metadata,
staged_table.io, staged_table.metadata_location)
+ table_input = _construct_table_input(table_name,
staged_table.metadata_location, properties, staged_table.metadata)
+ version_id = glue_table.get("VersionId")
+ if not version_id:
+ raise CommitFailedException(
+ f"Cannot commit {database_name}.{table_name} because Glue
table version id is missing"
+ )
+ self._update_glue_table(
+ database_name=database_name,
+ table_name=table_name,
+ table_input=table_input,
+ version_id=version_id,
+ )
+ except Exception:
+ # Clean up the table created in step 1.
+ try:
+ self.glue.delete_table(DatabaseName=database_name,
Name=table_name)
+ except Exception:
+ logger.warning(
+ f"Failed to clean up S3 Tables table
{database_name}.{table_name}",
+ exc_info=logger.isEnabledFor(logging.DEBUG),
+ )
+ raise
+
+ return Table(
+ identifier=self.identifier_to_tuple(identifier),
+ metadata=staged_table.metadata,
+ metadata_location=staged_table.metadata_location,
+ io=self._load_file_io(staged_table.metadata.properties,
staged_table.metadata_location),
+ catalog=self,
+ )
+
def create_table(
self,
identifier: str | Identifier,
@@ -435,6 +554,7 @@ class GlueCatalog(MetastoreCatalog):
identifier: Table identifier.
schema: Table's schema.
location: Location for the table. Optional Argument.
+ Must not be set for S3 Tables, which manage their own storage.
partition_spec: PartitionSpec for the table.
sort_order: SortOrder for the table.
properties: Table properties that can be a string based dictionary.
@@ -444,9 +564,22 @@ class GlueCatalog(MetastoreCatalog):
Raises:
AlreadyExistsError: If a table with the name already exists.
- ValueError: If the identifier is invalid, or no path is given to
store metadata.
+ ValueError: If the identifier is invalid, no path is given to
store metadata,
+ or a location is specified for an S3 Tables table.
"""
+ database_name, table_name =
self.identifier_to_database_and_table(identifier)
+
+ if self._is_s3tables_database(database_name):
+ return self._create_table_s3tables(
+ identifier=identifier,
+ schema=schema,
+ location=location,
+ partition_spec=partition_spec,
+ sort_order=sort_order,
+ properties=properties,
+ )
+
staged_table = self._create_staged_table(
identifier=identifier,
schema=schema,
@@ -455,13 +588,18 @@ class GlueCatalog(MetastoreCatalog):
sort_order=sort_order,
properties=properties,
)
- database_name, table_name =
self.identifier_to_database_and_table(identifier)
self._write_metadata(staged_table.metadata, staged_table.io,
staged_table.metadata_location)
table_input = _construct_table_input(table_name,
staged_table.metadata_location, properties, staged_table.metadata)
self._create_glue_table(database_name=database_name,
table_name=table_name, table_input=table_input)
- return self.load_table(identifier=identifier)
+ return Table(
+ identifier=self.identifier_to_tuple(identifier),
+ metadata=staged_table.metadata,
+ metadata_location=staged_table.metadata_location,
+ io=self._load_file_io(staged_table.metadata.properties,
staged_table.metadata_location),
+ catalog=self,
+ )
def register_table(self, identifier: str | Identifier, metadata_location:
str) -> Table:
"""Register a new table using existing metadata.
@@ -521,7 +659,11 @@ class GlueCatalog(MetastoreCatalog):
if current_table and updated_staged_table.metadata ==
current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata,
metadata_location=current_table.metadata_location)
- self._write_metadata(
+ # S3 Tables managed storage doesn't support ListObjectsV2, so skip the
exist check.
+ write_metadata = (
+ self._write_metadata_no_exist_check if
self._is_s3tables_database(database_name) else self._write_metadata
+ )
+ write_metadata(
metadata=updated_staged_table.metadata,
io=updated_staged_table.io,
metadata_path=updated_staged_table.metadata_location,
diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py
index 5273db22..c8da49a8 100644
--- a/tests/catalog/test_glue.py
+++ b/tests/catalog/test_glue.py
@@ -21,7 +21,7 @@ import pyarrow as pa
import pytest
from moto import mock_aws
-from pyiceberg.catalog.glue import GlueCatalog
+from pyiceberg.catalog.glue import GLUE_CONNECTION_S3_TABLES, GlueCatalog
from pyiceberg.exceptions import (
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
@@ -43,6 +43,57 @@ from tests.conftest import (
UNIFIED_AWS_SESSION_PROPERTIES,
)
+S3TABLES_WAREHOUSE_LOCATION = "s3tables-warehouse-location"
+
+
+def _patch_moto_for_s3tables(monkeypatch: pytest.MonkeyPatch) -> None:
+ """Patch moto to simulate S3 Tables federated databases.
+
+ Moto does not support FederatedDatabase on GetDatabase responses or
+ auto-populating StorageDescriptor.Location for S3 Tables. These patches
+ simulate the S3 Tables service behavior so that the GlueCatalog S3 Tables
+ code path can be tested end-to-end with moto.
+ """
+ from moto.glue.models import FakeDatabase, FakeTable
+
+ # Patch 1: Make GetDatabase return FederatedDatabase from the stored input.
+ _original_db_as_dict = FakeDatabase.as_dict
+
+ def _db_as_dict_with_federated(self): # type: ignore
+ result = _original_db_as_dict(self)
+ if federated := self.input.get("FederatedDatabase"):
+ result["FederatedDatabase"] = federated
+ return result
+
+ monkeypatch.setattr(FakeDatabase, "as_dict", _db_as_dict_with_federated)
+
+ # Patch 2: When a table is created with format=ICEBERG (the S3 Tables
convention),
+ # inject a StorageDescriptor.Location to simulate S3 Tables vending a table
+ # warehouse location.
+ _original_table_init = FakeTable.__init__
+
+ def _table_init_with_location(self, database_name, table_name,
table_input, catalog_id): # type: ignore
+ if table_input.get("Parameters", {}).get("format") == "ICEBERG" and
"StorageDescriptor" not in table_input:
+ table_input = {
+ **table_input,
+ "StorageDescriptor": {
+ "Columns": [],
+ "Location":
f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}/",
+ "InputFormat": "",
+ "OutputFormat": "",
+ "SerdeInfo": {},
+ },
+ }
+ _original_table_init(self, database_name, table_name, table_input,
catalog_id)
+
+ monkeypatch.setattr(FakeTable, "__init__", _table_init_with_location)
+
+ # Create a bucket backing the simulated table warehouse location. S3
Tables manages
+ # this storage internally, but in tests moto needs a real bucket for
metadata file
+ # writes to succeed.
+ s3 = boto3.client("s3", region_name="us-east-1")
+ s3.create_bucket(Bucket=S3TABLES_WAREHOUSE_LOCATION)
+
@mock_aws
def test_create_table_with_database_location(
@@ -953,3 +1004,78 @@ def test_glue_client_override() -> None:
test_client = boto3.client("glue", region_name="us-west-2")
test_catalog = GlueCatalog(catalog_name, test_client)
assert test_catalog.glue is test_client
+
+
+def _create_s3tables_database(catalog: GlueCatalog, database_name: str) ->
None:
+ """Create a Glue database with S3 Tables federation metadata."""
+ catalog.glue.create_database(
+ DatabaseInput={
+ "Name": database_name,
+ "FederatedDatabase": {
+ "Identifier":
"arn:aws:s3tables:us-east-1:123456789012:bucket/my-bucket",
+ "ConnectionType": GLUE_CONNECTION_S3_TABLES,
+ },
+ }
+ )
+
+
+@mock_aws
+def test_create_table_s3tables(
+ monkeypatch: pytest.MonkeyPatch,
+ _bucket_initialize: None,
+ moto_endpoint_url: str,
+ table_schema_nested: Schema,
+ database_name: str,
+ table_name: str,
+) -> None:
+ _patch_moto_for_s3tables(monkeypatch)
+
+ identifier = (database_name, table_name)
+ test_catalog = GlueCatalog("s3tables", **{"s3.endpoint":
moto_endpoint_url})
+ _create_s3tables_database(test_catalog, database_name)
+
+ table = test_catalog.create_table(identifier, table_schema_nested)
+ assert table.name() == identifier
+ assert table.location() ==
f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}"
+ assert
table.metadata_location.startswith(f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}/metadata/00000-")
+ assert table.metadata_location.endswith(".metadata.json")
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 0
+
+
+@mock_aws
+def test_create_table_s3tables_rejects_location(
+ monkeypatch: pytest.MonkeyPatch,
+ _bucket_initialize: None,
+ moto_endpoint_url: str,
+ table_schema_nested: Schema,
+ database_name: str,
+ table_name: str,
+) -> None:
+ _patch_moto_for_s3tables(monkeypatch)
+
+ identifier = (database_name, table_name)
+ test_catalog = GlueCatalog("s3tables", **{"s3.endpoint":
moto_endpoint_url})
+ _create_s3tables_database(test_catalog, database_name)
+
+ with pytest.raises(ValueError, match="Cannot specify a location for S3
Tables table"):
+ test_catalog.create_table(identifier, table_schema_nested,
location="s3://some-bucket/some-path")
+
+
+@mock_aws
+def test_create_table_s3tables_duplicate(
+ monkeypatch: pytest.MonkeyPatch,
+ _bucket_initialize: None,
+ moto_endpoint_url: str,
+ table_schema_nested: Schema,
+ database_name: str,
+ table_name: str,
+) -> None:
+ _patch_moto_for_s3tables(monkeypatch)
+
+ identifier = (database_name, table_name)
+ test_catalog = GlueCatalog("s3tables", **{"s3.endpoint":
moto_endpoint_url})
+ _create_s3tables_database(test_catalog, database_name)
+
+ test_catalog.create_table(identifier, table_schema_nested)
+ with pytest.raises(TableAlreadyExistsError):
+ test_catalog.create_table(identifier, table_schema_nested)