This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 2272e20  Glue catalog commit table (#140)
2272e20 is described below

commit 2272e207f5d790e665e1cce2272b0b2848c2a137
Author: HonahX <[email protected]>
AuthorDate: Tue Jan 2 02:27:11 2024 +0800

    Glue catalog commit table (#140)
    
    * Implement table metadata updater first draft
    
    * fix updater error and add tests
    
    * implement _commit_table for glue
    
    * implement apply_metadata_update which is simpler
    
    * remove old implementation
    
    * re-organize method place
    
    * fix nit
    
    * fix test
    
    * add another test
    
    * clear TODO
    
    * add a combined test
    
    * Fix merge conflict
    
    * update table metadata merged
    
    * implement requirements validation
    
    * change the exception to CommitFailedException
    
    * add docstring
    
    * use regex to parse the metadata version
    
    * fix lint issue
    
    * fix CI issue
    
    * make base_metadata optional and add null check
    
    * make base_metadata optional and add null check
    
    * add integration test
    
    * default skip-archive to true and comments
    
    * refactor tests
    
    * add doc and fix test after merge
    
    * make regex more robust, thanks Fokko!
    
    * Fix review comments, thanks Patrick!
---
 pyiceberg/catalog/__init__.py          |  46 +++++++++++++-
 pyiceberg/catalog/glue.py              | 113 ++++++++++++++++++++++++++++-----
 tests/catalog/integration_test_glue.py |  33 ++++++++++
 tests/catalog/test_glue.py             |  41 ++++++++++++
 tests/conftest.py                      |   4 +-
 5 files changed, 218 insertions(+), 19 deletions(-)

diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index 993be87..4cb6c2f 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import logging
+import re
 import uuid
 from abc import ABC, abstractmethod
 from dataclasses import dataclass
@@ -74,6 +75,17 @@ URI = "uri"
 LOCATION = "location"
 EXTERNAL_TABLE = "EXTERNAL_TABLE"
 
+TABLE_METADATA_FILE_NAME_REGEX = re.compile(
+    r"""
+    (\d+)              # version number
+    -                  # separator
+    ([\w-]{36})        # UUID (36 characters, including hyphens)
+    (?:\.\w+)?         # optional codec name
+    \.metadata\.json   # file extension
+    """,
+    re.X,
+)
+
 
 class CatalogType(Enum):
     REST = "rest"
@@ -587,8 +599,38 @@ class Catalog(ABC):
         ToOutputFile.table_metadata(metadata, io.new_output(metadata_path))
 
     @staticmethod
-    def _get_metadata_location(location: str) -> str:
-        return f"{location}/metadata/00000-{uuid.uuid4()}.metadata.json"
+    def _get_metadata_location(location: str, new_version: int = 0) -> str:
+        if new_version < 0:
+            raise ValueError(f"Table metadata version: `{new_version}` must be 
a non-negative integer")
+        version_str = f"{new_version:05d}"
+        return 
f"{location}/metadata/{version_str}-{uuid.uuid4()}.metadata.json"
+
+    @staticmethod
+    def _parse_metadata_version(metadata_location: str) -> int:
+        """Parse the version from the metadata location.
+
+        The version is the first part of the file name, before the first dash.
+        For example, the version of the metadata file
+        
`s3://bucket/db/tb/metadata/00001-6c97e413-d51b-4538-ac70-12fe2a85cb83.metadata.json`
+        is 1.
+        If the path does not comply with the pattern, the version is defaulted 
to be -1, ensuring
+        that the next metadata file is treated as having version 0.
+
+        Args:
+            metadata_location (str): The location of the metadata file.
+
+        Returns:
+            int: The version of the metadata file. -1 if the file name does 
not have valid version string
+        """
+        file_name = metadata_location.split("/")[-1]
+        if file_name_match := 
TABLE_METADATA_FILE_NAME_REGEX.fullmatch(file_name):
+            try:
+                uuid.UUID(file_name_match.group(2))
+            except ValueError:
+                return -1
+            return int(file_name_match.group(1))
+        else:
+            return -1
 
     def _get_updated_props_and_update_summary(
         self, current_properties: Properties, removals: Optional[Set[str]], 
updates: Properties
diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py
index 723de2f..6cf9462 100644
--- a/pyiceberg/catalog/glue.py
+++ b/pyiceberg/catalog/glue.py
@@ -40,6 +40,7 @@ from pyiceberg.catalog import (
     ICEBERG,
     LOCATION,
     METADATA_LOCATION,
+    PREVIOUS_METADATA_LOCATION,
     TABLE_TYPE,
     Catalog,
     Identifier,
@@ -47,6 +48,7 @@ from pyiceberg.catalog import (
     PropertiesUpdateSummary,
 )
 from pyiceberg.exceptions import (
+    CommitFailedException,
     NamespaceAlreadyExistsError,
     NamespaceNotEmptyError,
     NoSuchIcebergTableError,
@@ -59,21 +61,40 @@ from pyiceberg.io import load_file_io
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.serializers import FromInputFile
-from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
+from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, 
update_table_metadata
 from pyiceberg.table.metadata import new_table_metadata
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
 from pyiceberg.typedef import EMPTY_DICT
 
-
-def _construct_parameters(metadata_location: str) -> Properties:
-    return {TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: metadata_location}
-
-
-def _construct_create_table_input(table_name: str, metadata_location: str, 
properties: Properties) -> TableInputTypeDef:
+# If Glue should skip archiving an old table version when creating a new 
version in a commit. By
+# default, Glue archives all old table versions after an UpdateTable call, but 
Glue has a default
+# max number of archived table versions (can be increased). So for streaming 
use case with lots
+# of commits, it is recommended to set this value to true.
+GLUE_SKIP_ARCHIVE = "glue.skip-archive"
+GLUE_SKIP_ARCHIVE_DEFAULT = True
+
+
+def _construct_parameters(
+    metadata_location: str, glue_table: Optional[TableTypeDef] = None, 
prev_metadata_location: Optional[str] = None
+) -> Properties:
+    new_parameters = glue_table.get("Parameters", {}) if glue_table else {}
+    new_parameters.update({TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: 
metadata_location})
+    if prev_metadata_location:
+        new_parameters[PREVIOUS_METADATA_LOCATION] = prev_metadata_location
+    return new_parameters
+
+
+def _construct_table_input(
+    table_name: str,
+    metadata_location: str,
+    properties: Properties,
+    glue_table: Optional[TableTypeDef] = None,
+    prev_metadata_location: Optional[str] = None,
+) -> TableInputTypeDef:
     table_input: TableInputTypeDef = {
         "Name": table_name,
         "TableType": EXTERNAL_TABLE,
-        "Parameters": _construct_parameters(metadata_location),
+        "Parameters": _construct_parameters(metadata_location, glue_table, 
prev_metadata_location),
     }
 
     if "Description" in properties:
@@ -177,6 +198,28 @@ class GlueCatalog(Catalog):
         except self.glue.exceptions.EntityNotFoundException as e:
             raise NoSuchNamespaceError(f"Database {database_name} does not 
exist") from e
 
+    def _update_glue_table(self, database_name: str, table_name: str, 
table_input: TableInputTypeDef, version_id: str) -> None:
+        try:
+            self.glue.update_table(
+                DatabaseName=database_name,
+                TableInput=table_input,
+                SkipArchive=self.properties.get(GLUE_SKIP_ARCHIVE, 
GLUE_SKIP_ARCHIVE_DEFAULT),
+                VersionId=version_id,
+            )
+        except self.glue.exceptions.EntityNotFoundException as e:
+            raise NoSuchTableError(f"Table does not exist: 
{database_name}.{table_name} (Glue table version {version_id})") from e
+        except self.glue.exceptions.ConcurrentModificationException as e:
+            raise CommitFailedException(
+                f"Cannot commit {database_name}.{table_name} because Glue 
detected concurrent update to table version {version_id}"
+            ) from e
+
+    def _get_glue_table(self, database_name: str, table_name: str) -> 
TableTypeDef:
+        try:
+            load_table_response = 
self.glue.get_table(DatabaseName=database_name, Name=table_name)
+            return load_table_response["Table"]
+        except self.glue.exceptions.EntityNotFoundException as e:
+            raise NoSuchTableError(f"Table does not exist: 
{database_name}.{table_name}") from e
+
     def create_table(
         self,
         identifier: Union[str, Identifier],
@@ -215,7 +258,7 @@ class GlueCatalog(Catalog):
         io = load_file_io(properties=self.properties, 
location=metadata_location)
         self._write_metadata(metadata, io, metadata_location)
 
-        table_input = _construct_create_table_input(table_name, 
metadata_location, properties)
+        table_input = _construct_table_input(table_name, metadata_location, 
properties)
         database_name, table_name = 
self.identifier_to_database_and_table(identifier)
         self._create_glue_table(database_name=database_name, 
table_name=table_name, table_input=table_input)
 
@@ -247,8 +290,52 @@ class GlueCatalog(Catalog):
 
         Raises:
             NoSuchTableError: If a table with the given identifier does not 
exist.
+            CommitFailedException: If the commit failed.
         """
-        raise NotImplementedError
+        identifier_tuple = self.identifier_to_tuple_without_catalog(
+            tuple(table_request.identifier.namespace.root + 
[table_request.identifier.name])
+        )
+        database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple)
+
+        current_glue_table = self._get_glue_table(database_name=database_name, 
table_name=table_name)
+        glue_table_version_id = current_glue_table.get("VersionId")
+        if not glue_table_version_id:
+            raise CommitFailedException(f"Cannot commit 
{database_name}.{table_name} because Glue table version id is missing")
+        current_table = 
self._convert_glue_to_iceberg(glue_table=current_glue_table)
+        base_metadata = current_table.metadata
+
+        # Validate the update requirements
+        for requirement in table_request.requirements:
+            requirement.validate(base_metadata)
+
+        updated_metadata = update_table_metadata(base_metadata, 
table_request.updates)
+        if updated_metadata == base_metadata:
+            # no changes, do nothing
+            return CommitTableResponse(metadata=base_metadata, 
metadata_location=current_table.metadata_location)
+
+        # write new metadata
+        new_metadata_version = 
self._parse_metadata_version(current_table.metadata_location) + 1
+        new_metadata_location = 
self._get_metadata_location(current_table.metadata.location, 
new_metadata_version)
+        self._write_metadata(updated_metadata, current_table.io, 
new_metadata_location)
+
+        update_table_input = _construct_table_input(
+            table_name=table_name,
+            metadata_location=new_metadata_location,
+            properties=current_table.properties,
+            glue_table=current_glue_table,
+            prev_metadata_location=current_table.metadata_location,
+        )
+
+        # Pass `version_id` to implement optimistic locking: it ensures 
updates are rejected if concurrent
+        # modifications occur. See more details at 
https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
+        self._update_glue_table(
+            database_name=database_name,
+            table_name=table_name,
+            table_input=update_table_input,
+            version_id=glue_table_version_id,
+        )
+
+        return CommitTableResponse(metadata=updated_metadata, 
metadata_location=new_metadata_location)
 
     def load_table(self, identifier: Union[str, Identifier]) -> Table:
         """Load the table's metadata and returns the table instance.
@@ -267,12 +354,8 @@ class GlueCatalog(Catalog):
         """
         identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
         database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
-        try:
-            load_table_response = 
self.glue.get_table(DatabaseName=database_name, Name=table_name)
-        except self.glue.exceptions.EntityNotFoundException as e:
-            raise NoSuchTableError(f"Table does not exist: 
{database_name}.{table_name}") from e
 
-        return self._convert_glue_to_iceberg(load_table_response["Table"])
+        return 
self._convert_glue_to_iceberg(self._get_glue_table(database_name=database_name, 
table_name=table_name))
 
     def drop_table(self, identifier: Union[str, Identifier]) -> None:
         """Drop a table.
diff --git a/tests/catalog/integration_test_glue.py 
b/tests/catalog/integration_test_glue.py
index 2689ef1..99f0ada 100644
--- a/tests/catalog/integration_test_glue.py
+++ b/tests/catalog/integration_test_glue.py
@@ -31,6 +31,7 @@ from pyiceberg.exceptions import (
     TableAlreadyExistsError,
 )
 from pyiceberg.schema import Schema
+from pyiceberg.types import IntegerType
 from tests.conftest import clean_up, get_bucket_name, get_s3_path
 
 # The number of tables/databases used in list_table/namespace test
@@ -61,6 +62,7 @@ def test_create_table(
     assert table.identifier == (CATALOG_NAME,) + identifier
     metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
     s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 0
 
 
 def test_create_table_with_invalid_location(table_schema_nested: Schema, 
table_name: str, database_name: str) -> None:
@@ -82,6 +84,7 @@ def test_create_table_with_default_location(
     assert table.identifier == (CATALOG_NAME,) + identifier
     metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
     s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 0
 
 
 def test_create_table_with_invalid_database(test_catalog: Catalog, 
table_schema_nested: Schema, table_name: str) -> None:
@@ -105,6 +108,7 @@ def test_load_table(test_catalog: Catalog, 
table_schema_nested: Schema, table_na
     assert table.identifier == loaded_table.identifier
     assert table.metadata_location == loaded_table.metadata_location
     assert table.metadata == loaded_table.metadata
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 0
 
 
 def test_list_tables(test_catalog: Catalog, table_schema_nested: Schema, 
database_name: str, table_list: List[str]) -> None:
@@ -126,6 +130,7 @@ def test_rename_table(
     new_table_name = f"rename-{table_name}"
     identifier = (database_name, table_name)
     table = test_catalog.create_table(identifier, table_schema_nested)
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 0
     assert table.identifier == (CATALOG_NAME,) + identifier
     new_identifier = (new_database_name, new_table_name)
     test_catalog.rename_table(identifier, new_identifier)
@@ -261,3 +266,31 @@ def test_update_namespace_properties(test_catalog: 
Catalog, database_name: str)
         else:
             assert k in update_report.removed
     assert "updated test description" == 
test_catalog.load_namespace_properties(database_name)["comment"]
+
+
+def test_commit_table_update_schema(
+    test_catalog: Catalog, table_schema_nested: Schema, database_name: str, 
table_name: str
+) -> None:
+    identifier = (database_name, table_name)
+    test_catalog.create_namespace(namespace=database_name)
+    table = test_catalog.create_table(identifier, table_schema_nested)
+    original_table_metadata = table.metadata
+
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 0
+    assert original_table_metadata.current_schema_id == 0
+
+    transaction = table.transaction()
+    update = transaction.update_schema()
+    update.add_column(path="b", field_type=IntegerType())
+    update.commit()
+    transaction.commit_transaction()
+
+    updated_table_metadata = table.metadata
+
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 1
+    assert updated_table_metadata.current_schema_id == 1
+    assert len(updated_table_metadata.schemas) == 2
+    new_schema = next(schema for schema in updated_table_metadata.schemas if 
schema.schema_id == 1)
+    assert new_schema
+    assert new_schema == update._apply()
+    assert new_schema.find_field("b").field_type == IntegerType()
diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py
index 6aaeb28..2a33c8e 100644
--- a/tests/catalog/test_glue.py
+++ b/tests/catalog/test_glue.py
@@ -31,6 +31,7 @@ from pyiceberg.exceptions import (
     TableAlreadyExistsError,
 )
 from pyiceberg.schema import Schema
+from pyiceberg.types import IntegerType
 from tests.conftest import BUCKET_NAME, TABLE_METADATA_LOCATION_REGEX
 
 
@@ -45,6 +46,7 @@ def test_create_table_with_database_location(
     table = test_catalog.create_table(identifier, table_schema_nested)
     assert table.identifier == (catalog_name,) + identifier
     assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 0
 
 
 @mock_glue
@@ -58,6 +60,7 @@ def test_create_table_with_default_warehouse(
     table = test_catalog.create_table(identifier, table_schema_nested)
     assert table.identifier == (catalog_name,) + identifier
     assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 0
 
 
 @mock_glue
@@ -73,6 +76,7 @@ def test_create_table_with_given_location(
     )
     assert table.identifier == (catalog_name,) + identifier
     assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 0
 
 
 @mock_glue
@@ -98,6 +102,7 @@ def test_create_table_with_strips(
     table = test_catalog.create_table(identifier, table_schema_nested)
     assert table.identifier == (catalog_name,) + identifier
     assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 0
 
 
 @mock_glue
@@ -111,6 +116,7 @@ def test_create_table_with_strips_bucket_root(
     table_strip = test_catalog.create_table(identifier, table_schema_nested)
     assert table_strip.identifier == (catalog_name,) + identifier
     assert TABLE_METADATA_LOCATION_REGEX.match(table_strip.metadata_location)
+    assert test_catalog._parse_metadata_version(table_strip.metadata_location) 
== 0
 
 
 @mock_glue
@@ -147,6 +153,7 @@ def test_load_table(
     table = test_catalog.load_table(identifier)
     assert table.identifier == (catalog_name,) + identifier
     assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 0
 
 
 @mock_glue
@@ -229,6 +236,7 @@ def test_rename_table(
     table = test_catalog.create_table(identifier, table_schema_nested)
     assert table.identifier == (catalog_name,) + identifier
     assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 0
     test_catalog.rename_table(identifier, new_identifier)
     new_table = test_catalog.load_table(new_identifier)
     assert new_table.identifier == (catalog_name,) + new_identifier
@@ -507,3 +515,36 @@ def test_passing_profile_name() -> None:
 
     mock_session.assert_called_with(**session_properties)
     assert test_catalog.glue is mock_session().client()
+
+
+@mock_glue
+def test_commit_table_update_schema(
+    _bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: 
Schema, database_name: str, table_name: str
+) -> None:
+    catalog_name = "glue"
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": 
moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
+    test_catalog.create_namespace(namespace=database_name)
+    table = test_catalog.create_table(identifier, table_schema_nested)
+    original_table_metadata = table.metadata
+
+    assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 0
+    assert original_table_metadata.current_schema_id == 0
+
+    transaction = table.transaction()
+    update = transaction.update_schema()
+    update.add_column(path="b", field_type=IntegerType())
+    update.commit()
+    transaction.commit_transaction()
+
+    updated_table_metadata = table.metadata
+
+    assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 1
+    assert updated_table_metadata.current_schema_id == 1
+    assert len(updated_table_metadata.schemas) == 2
+    new_schema = next(schema for schema in updated_table_metadata.schemas if 
schema.schema_id == 1)
+    assert new_schema
+    assert new_schema == update._apply()
+    assert new_schema.find_field("b").field_type == IntegerType()
diff --git a/tests/conftest.py b/tests/conftest.py
index 1197bf2..e54f1f5 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1586,7 +1586,7 @@ def fixture_aws_credentials() -> Generator[None, None, 
None]:
     os.environ.pop("AWS_DEFAULT_REGION")
 
 
-MOTO_SERVER = ThreadedMotoServer(port=5000)
+MOTO_SERVER = ThreadedMotoServer(ip_address="localhost", port=5000)
 
 
 def pytest_sessionfinish(
@@ -1687,7 +1687,7 @@ BUCKET_NAME = "test_bucket"
 TABLE_METADATA_LOCATION_REGEX = re.compile(
     r"""s3://test_bucket/my_iceberg_database-[a-z]{20}.db/
     my_iceberg_table-[a-z]{20}/metadata/
-    
00000-[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.metadata.json""",
+    
[0-9]{5}-[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.metadata.json""",
     re.X,
 )
 

Reply via email to