This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 2272e20 Glue catalog commit table (#140)
2272e20 is described below
commit 2272e207f5d790e665e1cce2272b0b2848c2a137
Author: HonahX <[email protected]>
AuthorDate: Tue Jan 2 02:27:11 2024 +0800
Glue catalog commit table (#140)
* Implement table metadata updater first draft
* fix updater error and add tests
* implement _commit_table for glue
* implement apply_metadata_update which is simpler
* remove old implementation
* re-organize method place
* fix nit
* fix test
* add another test
* clear TODO
* add a combined test
* Fix merge conflict
* update table metadata merged
* implement requirements validation
* change the exception to CommitFailedException
* add docstring
* use regex to parse the metadata version
* fix lint issue
* fix CI issue
* make base_metadata optional and add null check
* make base_metadata optional and add null check
* add integration test
* default skip-archive to true and comments
* refactor tests
* add doc and fix test after merge
* make regex more robust, thanks Fokko!
* Fix review comments, thanks Patrick!
---
pyiceberg/catalog/__init__.py | 46 +++++++++++++-
pyiceberg/catalog/glue.py | 113 ++++++++++++++++++++++++++++-----
tests/catalog/integration_test_glue.py | 33 ++++++++++
tests/catalog/test_glue.py | 41 ++++++++++++
tests/conftest.py | 4 +-
5 files changed, 218 insertions(+), 19 deletions(-)
diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index 993be87..4cb6c2f 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import logging
+import re
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass
@@ -74,6 +75,17 @@ URI = "uri"
LOCATION = "location"
EXTERNAL_TABLE = "EXTERNAL_TABLE"
+TABLE_METADATA_FILE_NAME_REGEX = re.compile(
+ r"""
+ (\d+) # version number
+ - # separator
+ ([\w-]{36}) # UUID (36 characters, including hyphens)
+ (?:\.\w+)? # optional codec name
+ \.metadata\.json # file extension
+ """,
+ re.X,
+)
+
class CatalogType(Enum):
REST = "rest"
@@ -587,8 +599,38 @@ class Catalog(ABC):
ToOutputFile.table_metadata(metadata, io.new_output(metadata_path))
@staticmethod
- def _get_metadata_location(location: str) -> str:
- return f"{location}/metadata/00000-{uuid.uuid4()}.metadata.json"
+ def _get_metadata_location(location: str, new_version: int = 0) -> str:
+ if new_version < 0:
+ raise ValueError(f"Table metadata version: `{new_version}` must be
a non-negative integer")
+ version_str = f"{new_version:05d}"
+ return
f"{location}/metadata/{version_str}-{uuid.uuid4()}.metadata.json"
+
+ @staticmethod
+ def _parse_metadata_version(metadata_location: str) -> int:
+ """Parse the version from the metadata location.
+
+ The version is the first part of the file name, before the first dash.
+ For example, the version of the metadata file
+
`s3://bucket/db/tb/metadata/00001-6c97e413-d51b-4538-ac70-12fe2a85cb83.metadata.json`
+ is 1.
+ If the path does not comply with the pattern, the version is defaulted
to be -1, ensuring
+ that the next metadata file is treated as having version 0.
+
+ Args:
+ metadata_location (str): The location of the metadata file.
+
+ Returns:
+ int: The version of the metadata file. -1 if the file name does
not have valid version string
+ """
+ file_name = metadata_location.split("/")[-1]
+ if file_name_match :=
TABLE_METADATA_FILE_NAME_REGEX.fullmatch(file_name):
+ try:
+ uuid.UUID(file_name_match.group(2))
+ except ValueError:
+ return -1
+ return int(file_name_match.group(1))
+ else:
+ return -1
def _get_updated_props_and_update_summary(
self, current_properties: Properties, removals: Optional[Set[str]],
updates: Properties
diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py
index 723de2f..6cf9462 100644
--- a/pyiceberg/catalog/glue.py
+++ b/pyiceberg/catalog/glue.py
@@ -40,6 +40,7 @@ from pyiceberg.catalog import (
ICEBERG,
LOCATION,
METADATA_LOCATION,
+ PREVIOUS_METADATA_LOCATION,
TABLE_TYPE,
Catalog,
Identifier,
@@ -47,6 +48,7 @@ from pyiceberg.catalog import (
PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
+ CommitFailedException,
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchIcebergTableError,
@@ -59,21 +61,40 @@ from pyiceberg.io import load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
-from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
+from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table,
update_table_metadata
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT
-
-def _construct_parameters(metadata_location: str) -> Properties:
- return {TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: metadata_location}
-
-
-def _construct_create_table_input(table_name: str, metadata_location: str,
properties: Properties) -> TableInputTypeDef:
+# If Glue should skip archiving an old table version when creating a new
version in a commit. By
+# default, Glue archives all old table versions after an UpdateTable call, but
Glue has a default
+# max number of archived table versions (can be increased). So for streaming
use case with lots
+# of commits, it is recommended to set this value to true.
+GLUE_SKIP_ARCHIVE = "glue.skip-archive"
+GLUE_SKIP_ARCHIVE_DEFAULT = True
+
+
+def _construct_parameters(
+ metadata_location: str, glue_table: Optional[TableTypeDef] = None,
prev_metadata_location: Optional[str] = None
+) -> Properties:
+ new_parameters = glue_table.get("Parameters", {}) if glue_table else {}
+ new_parameters.update({TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION:
metadata_location})
+ if prev_metadata_location:
+ new_parameters[PREVIOUS_METADATA_LOCATION] = prev_metadata_location
+ return new_parameters
+
+
+def _construct_table_input(
+ table_name: str,
+ metadata_location: str,
+ properties: Properties,
+ glue_table: Optional[TableTypeDef] = None,
+ prev_metadata_location: Optional[str] = None,
+) -> TableInputTypeDef:
table_input: TableInputTypeDef = {
"Name": table_name,
"TableType": EXTERNAL_TABLE,
- "Parameters": _construct_parameters(metadata_location),
+ "Parameters": _construct_parameters(metadata_location, glue_table,
prev_metadata_location),
}
if "Description" in properties:
@@ -177,6 +198,28 @@ class GlueCatalog(Catalog):
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchNamespaceError(f"Database {database_name} does not
exist") from e
+ def _update_glue_table(self, database_name: str, table_name: str,
table_input: TableInputTypeDef, version_id: str) -> None:
+ try:
+ self.glue.update_table(
+ DatabaseName=database_name,
+ TableInput=table_input,
+ SkipArchive=self.properties.get(GLUE_SKIP_ARCHIVE,
GLUE_SKIP_ARCHIVE_DEFAULT),
+ VersionId=version_id,
+ )
+ except self.glue.exceptions.EntityNotFoundException as e:
+ raise NoSuchTableError(f"Table does not exist:
{database_name}.{table_name} (Glue table version {version_id})") from e
+ except self.glue.exceptions.ConcurrentModificationException as e:
+ raise CommitFailedException(
+ f"Cannot commit {database_name}.{table_name} because Glue
detected concurrent update to table version {version_id}"
+ ) from e
+
+ def _get_glue_table(self, database_name: str, table_name: str) ->
TableTypeDef:
+ try:
+ load_table_response =
self.glue.get_table(DatabaseName=database_name, Name=table_name)
+ return load_table_response["Table"]
+ except self.glue.exceptions.EntityNotFoundException as e:
+ raise NoSuchTableError(f"Table does not exist:
{database_name}.{table_name}") from e
+
def create_table(
self,
identifier: Union[str, Identifier],
@@ -215,7 +258,7 @@ class GlueCatalog(Catalog):
io = load_file_io(properties=self.properties,
location=metadata_location)
self._write_metadata(metadata, io, metadata_location)
- table_input = _construct_create_table_input(table_name,
metadata_location, properties)
+ table_input = _construct_table_input(table_name, metadata_location,
properties)
database_name, table_name =
self.identifier_to_database_and_table(identifier)
self._create_glue_table(database_name=database_name,
table_name=table_name, table_input=table_input)
@@ -247,8 +290,52 @@ class GlueCatalog(Catalog):
Raises:
NoSuchTableError: If a table with the given identifier does not
exist.
+ CommitFailedException: If the commit failed.
"""
- raise NotImplementedError
+ identifier_tuple = self.identifier_to_tuple_without_catalog(
+ tuple(table_request.identifier.namespace.root +
[table_request.identifier.name])
+ )
+ database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple)
+
+ current_glue_table = self._get_glue_table(database_name=database_name,
table_name=table_name)
+ glue_table_version_id = current_glue_table.get("VersionId")
+ if not glue_table_version_id:
+ raise CommitFailedException(f"Cannot commit
{database_name}.{table_name} because Glue table version id is missing")
+ current_table =
self._convert_glue_to_iceberg(glue_table=current_glue_table)
+ base_metadata = current_table.metadata
+
+ # Validate the update requirements
+ for requirement in table_request.requirements:
+ requirement.validate(base_metadata)
+
+ updated_metadata = update_table_metadata(base_metadata,
table_request.updates)
+ if updated_metadata == base_metadata:
+ # no changes, do nothing
+ return CommitTableResponse(metadata=base_metadata,
metadata_location=current_table.metadata_location)
+
+ # write new metadata
+ new_metadata_version =
self._parse_metadata_version(current_table.metadata_location) + 1
+ new_metadata_location =
self._get_metadata_location(current_table.metadata.location,
new_metadata_version)
+ self._write_metadata(updated_metadata, current_table.io,
new_metadata_location)
+
+ update_table_input = _construct_table_input(
+ table_name=table_name,
+ metadata_location=new_metadata_location,
+ properties=current_table.properties,
+ glue_table=current_glue_table,
+ prev_metadata_location=current_table.metadata_location,
+ )
+
+ # Pass `version_id` to implement optimistic locking: it ensures
updates are rejected if concurrent
+ # modifications occur. See more details at
https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
+ self._update_glue_table(
+ database_name=database_name,
+ table_name=table_name,
+ table_input=update_table_input,
+ version_id=glue_table_version_id,
+ )
+
+ return CommitTableResponse(metadata=updated_metadata,
metadata_location=new_metadata_location)
def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Load the table's metadata and returns the table instance.
@@ -267,12 +354,8 @@ class GlueCatalog(Catalog):
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
- try:
- load_table_response =
self.glue.get_table(DatabaseName=database_name, Name=table_name)
- except self.glue.exceptions.EntityNotFoundException as e:
- raise NoSuchTableError(f"Table does not exist:
{database_name}.{table_name}") from e
- return self._convert_glue_to_iceberg(load_table_response["Table"])
+ return
self._convert_glue_to_iceberg(self._get_glue_table(database_name=database_name,
table_name=table_name))
def drop_table(self, identifier: Union[str, Identifier]) -> None:
"""Drop a table.
diff --git a/tests/catalog/integration_test_glue.py
b/tests/catalog/integration_test_glue.py
index 2689ef1..99f0ada 100644
--- a/tests/catalog/integration_test_glue.py
+++ b/tests/catalog/integration_test_glue.py
@@ -31,6 +31,7 @@ from pyiceberg.exceptions import (
TableAlreadyExistsError,
)
from pyiceberg.schema import Schema
+from pyiceberg.types import IntegerType
from tests.conftest import clean_up, get_bucket_name, get_s3_path
# The number of tables/databases used in list_table/namespace test
@@ -61,6 +62,7 @@ def test_create_table(
assert table.identifier == (CATALOG_NAME,) + identifier
metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 0
def test_create_table_with_invalid_location(table_schema_nested: Schema,
table_name: str, database_name: str) -> None:
@@ -82,6 +84,7 @@ def test_create_table_with_default_location(
assert table.identifier == (CATALOG_NAME,) + identifier
metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 0
def test_create_table_with_invalid_database(test_catalog: Catalog,
table_schema_nested: Schema, table_name: str) -> None:
@@ -105,6 +108,7 @@ def test_load_table(test_catalog: Catalog,
table_schema_nested: Schema, table_na
assert table.identifier == loaded_table.identifier
assert table.metadata_location == loaded_table.metadata_location
assert table.metadata == loaded_table.metadata
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 0
def test_list_tables(test_catalog: Catalog, table_schema_nested: Schema,
database_name: str, table_list: List[str]) -> None:
@@ -126,6 +130,7 @@ def test_rename_table(
new_table_name = f"rename-{table_name}"
identifier = (database_name, table_name)
table = test_catalog.create_table(identifier, table_schema_nested)
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 0
assert table.identifier == (CATALOG_NAME,) + identifier
new_identifier = (new_database_name, new_table_name)
test_catalog.rename_table(identifier, new_identifier)
@@ -261,3 +266,31 @@ def test_update_namespace_properties(test_catalog:
Catalog, database_name: str)
else:
assert k in update_report.removed
assert "updated test description" ==
test_catalog.load_namespace_properties(database_name)["comment"]
+
+
+def test_commit_table_update_schema(
+ test_catalog: Catalog, table_schema_nested: Schema, database_name: str,
table_name: str
+) -> None:
+ identifier = (database_name, table_name)
+ test_catalog.create_namespace(namespace=database_name)
+ table = test_catalog.create_table(identifier, table_schema_nested)
+ original_table_metadata = table.metadata
+
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 0
+ assert original_table_metadata.current_schema_id == 0
+
+ transaction = table.transaction()
+ update = transaction.update_schema()
+ update.add_column(path="b", field_type=IntegerType())
+ update.commit()
+ transaction.commit_transaction()
+
+ updated_table_metadata = table.metadata
+
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 1
+ assert updated_table_metadata.current_schema_id == 1
+ assert len(updated_table_metadata.schemas) == 2
+ new_schema = next(schema for schema in updated_table_metadata.schemas if
schema.schema_id == 1)
+ assert new_schema
+ assert new_schema == update._apply()
+ assert new_schema.find_field("b").field_type == IntegerType()
diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py
index 6aaeb28..2a33c8e 100644
--- a/tests/catalog/test_glue.py
+++ b/tests/catalog/test_glue.py
@@ -31,6 +31,7 @@ from pyiceberg.exceptions import (
TableAlreadyExistsError,
)
from pyiceberg.schema import Schema
+from pyiceberg.types import IntegerType
from tests.conftest import BUCKET_NAME, TABLE_METADATA_LOCATION_REGEX
@@ -45,6 +46,7 @@ def test_create_table_with_database_location(
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.identifier == (catalog_name,) + identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 0
@mock_glue
@@ -58,6 +60,7 @@ def test_create_table_with_default_warehouse(
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.identifier == (catalog_name,) + identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 0
@mock_glue
@@ -73,6 +76,7 @@ def test_create_table_with_given_location(
)
assert table.identifier == (catalog_name,) + identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 0
@mock_glue
@@ -98,6 +102,7 @@ def test_create_table_with_strips(
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.identifier == (catalog_name,) + identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 0
@mock_glue
@@ -111,6 +116,7 @@ def test_create_table_with_strips_bucket_root(
table_strip = test_catalog.create_table(identifier, table_schema_nested)
assert table_strip.identifier == (catalog_name,) + identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table_strip.metadata_location)
+ assert test_catalog._parse_metadata_version(table_strip.metadata_location)
== 0
@mock_glue
@@ -147,6 +153,7 @@ def test_load_table(
table = test_catalog.load_table(identifier)
assert table.identifier == (catalog_name,) + identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 0
@mock_glue
@@ -229,6 +236,7 @@ def test_rename_table(
table = test_catalog.create_table(identifier, table_schema_nested)
assert table.identifier == (catalog_name,) + identifier
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 0
test_catalog.rename_table(identifier, new_identifier)
new_table = test_catalog.load_table(new_identifier)
assert new_table.identifier == (catalog_name,) + new_identifier
@@ -507,3 +515,36 @@ def test_passing_profile_name() -> None:
mock_session.assert_called_with(**session_properties)
assert test_catalog.glue is mock_session().client()
+
+
+@mock_glue
+def test_commit_table_update_schema(
+ _bucket_initialize: None, moto_endpoint_url: str, table_schema_nested:
Schema, database_name: str, table_name: str
+) -> None:
+ catalog_name = "glue"
+ identifier = (database_name, table_name)
+ test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint":
moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
+ test_catalog.create_namespace(namespace=database_name)
+ table = test_catalog.create_table(identifier, table_schema_nested)
+ original_table_metadata = table.metadata
+
+ assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 0
+ assert original_table_metadata.current_schema_id == 0
+
+ transaction = table.transaction()
+ update = transaction.update_schema()
+ update.add_column(path="b", field_type=IntegerType())
+ update.commit()
+ transaction.commit_transaction()
+
+ updated_table_metadata = table.metadata
+
+ assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 1
+ assert updated_table_metadata.current_schema_id == 1
+ assert len(updated_table_metadata.schemas) == 2
+ new_schema = next(schema for schema in updated_table_metadata.schemas if
schema.schema_id == 1)
+ assert new_schema
+ assert new_schema == update._apply()
+ assert new_schema.find_field("b").field_type == IntegerType()
diff --git a/tests/conftest.py b/tests/conftest.py
index 1197bf2..e54f1f5 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1586,7 +1586,7 @@ def fixture_aws_credentials() -> Generator[None, None,
None]:
os.environ.pop("AWS_DEFAULT_REGION")
-MOTO_SERVER = ThreadedMotoServer(port=5000)
+MOTO_SERVER = ThreadedMotoServer(ip_address="localhost", port=5000)
def pytest_sessionfinish(
@@ -1687,7 +1687,7 @@ BUCKET_NAME = "test_bucket"
TABLE_METADATA_LOCATION_REGEX = re.compile(
r"""s3://test_bucket/my_iceberg_database-[a-z]{20}.db/
my_iceberg_table-[a-z]{20}/metadata/
-
00000-[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.metadata.json""",
+
[0-9]{5}-[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.metadata.json""",
re.X,
)