This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new f72e363b Refactor GlueCatalog `_commit_table` (#653)
f72e363b is described below
commit f72e363b18baa181c998bbdef657982159a22d48
Author: Honah J <[email protected]>
AuthorDate: Thu Apr 25 01:21:33 2024 -0700
Refactor GlueCatalog `_commit_table` (#653)
* refactor _commit_table
* small refactor
* extract common logic of _commit_table
* reformat
---
pyiceberg/catalog/__init__.py | 22 ++++++++++
pyiceberg/catalog/glue.py | 79 +++++++++++++++-------------------
tests/catalog/integration_test_glue.py | 14 ++++--
tests/catalog/test_glue.py | 17 ++++++--
4 files changed, 82 insertions(+), 50 deletions(-)
diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index f104aa94..18d803fe 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -48,6 +48,7 @@ from pyiceberg.table import (
CreateTableTransaction,
StagedTable,
Table,
+ update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1,
new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -728,6 +729,27 @@ class MetastoreCatalog(Catalog, ABC):
catalog=self,
)
+ def _update_and_stage_table(self, current_table: Optional[Table],
table_request: CommitTableRequest) -> StagedTable:
+ for requirement in table_request.requirements:
+ requirement.validate(current_table.metadata if current_table else
None)
+
+ updated_metadata = update_table_metadata(
+ base_metadata=current_table.metadata if current_table else
self._empty_table_metadata(),
+ updates=table_request.updates,
+ enforce_validation=current_table is None,
+ )
+
+ new_metadata_version =
self._parse_metadata_version(current_table.metadata_location) + 1 if
current_table else 0
+ new_metadata_location =
self._get_metadata_location(updated_metadata.location, new_metadata_version)
+
+ return StagedTable(
+ identifier=tuple(table_request.identifier.namespace.root +
[table_request.identifier.name]),
+ metadata=updated_metadata,
+ metadata_location=new_metadata_location,
+ io=self._load_file_io(properties=updated_metadata.properties,
location=new_metadata_location),
+ catalog=self,
+ )
+
def _get_updated_props_and_update_summary(
self, current_properties: Properties, removals: Optional[Set[str]],
updates: Properties
) -> Tuple[PropertiesUpdateSummary, Properties]:
diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py
index c3c2fdaf..275cda7e 100644
--- a/pyiceberg/catalog/glue.py
+++ b/pyiceberg/catalog/glue.py
@@ -58,7 +58,6 @@ from pyiceberg.exceptions import (
NoSuchTableError,
TableAlreadyExistsError,
)
-from pyiceberg.io import load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
@@ -67,7 +66,6 @@ from pyiceberg.table import (
CommitTableResponse,
PropertyUtil,
Table,
- update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -321,7 +319,7 @@ class GlueCatalog(MetastoreCatalog):
)
metadata_location = properties[METADATA_LOCATION]
- io = load_file_io(properties=self.properties,
location=metadata_location)
+ io = self._load_file_io(location=metadata_location)
file = io.new_input(metadata_location)
metadata = FromInputFile.table_metadata(file)
return Table(
@@ -439,71 +437,64 @@ class GlueCatalog(MetastoreCatalog):
)
database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple)
+ current_glue_table: Optional[TableTypeDef]
+ glue_table_version_id: Optional[str]
+ current_table: Optional[Table]
try:
current_glue_table =
self._get_glue_table(database_name=database_name, table_name=table_name)
- # Update the table
glue_table_version_id = current_glue_table.get("VersionId")
+ current_table =
self._convert_glue_to_iceberg(glue_table=current_glue_table)
+ except NoSuchTableError:
+ current_glue_table = None
+ glue_table_version_id = None
+ current_table = None
+
+ updated_staged_table = self._update_and_stage_table(current_table,
table_request)
+ if current_table and updated_staged_table.metadata ==
current_table.metadata:
+ # no changes, do nothing
+ return CommitTableResponse(metadata=current_table.metadata,
metadata_location=current_table.metadata_location)
+ self._write_metadata(
+ metadata=updated_staged_table.metadata,
+ io=updated_staged_table.io,
+ metadata_path=updated_staged_table.metadata_location,
+ )
+
+ if current_table:
+ # table exists, update the table
if not glue_table_version_id:
raise CommitFailedException(
f"Cannot commit {database_name}.{table_name} because Glue
table version id is missing"
)
- current_table =
self._convert_glue_to_iceberg(glue_table=current_glue_table)
- base_metadata = current_table.metadata
-
- # Validate the update requirements
- for requirement in table_request.requirements:
- requirement.validate(base_metadata)
-
- updated_metadata =
update_table_metadata(base_metadata=base_metadata,
updates=table_request.updates)
- if updated_metadata == base_metadata:
- # no changes, do nothing
- return CommitTableResponse(metadata=base_metadata,
metadata_location=current_table.metadata_location)
-
- # write new metadata
- new_metadata_version =
self._parse_metadata_version(current_table.metadata_location) + 1
- new_metadata_location =
self._get_metadata_location(current_table.metadata.location,
new_metadata_version)
- self._write_metadata(updated_metadata, current_table.io,
new_metadata_location)
+ # Pass `version_id` to implement optimistic locking: it ensures
updates are rejected if concurrent
+ # modifications occur. See more details at
https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
update_table_input = _construct_table_input(
table_name=table_name,
- metadata_location=new_metadata_location,
- properties=current_table.properties,
- metadata=updated_metadata,
+ metadata_location=updated_staged_table.metadata_location,
+ properties=updated_staged_table.properties,
+ metadata=updated_staged_table.metadata,
glue_table=current_glue_table,
prev_metadata_location=current_table.metadata_location,
)
-
- # Pass `version_id` to implement optimistic locking: it ensures
updates are rejected if concurrent
- # modifications occur. See more details at
https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
self._update_glue_table(
database_name=database_name,
table_name=table_name,
table_input=update_table_input,
version_id=glue_table_version_id,
)
-
- return CommitTableResponse(metadata=updated_metadata,
metadata_location=new_metadata_location)
- except NoSuchTableError:
- # Create the table
- updated_metadata = update_table_metadata(
- base_metadata=self._empty_table_metadata(),
updates=table_request.updates, enforce_validation=True
- )
- new_metadata_version = 0
- new_metadata_location =
self._get_metadata_location(updated_metadata.location, new_metadata_version)
- self._write_metadata(
- updated_metadata,
self._load_file_io(updated_metadata.properties, new_metadata_location),
new_metadata_location
- )
-
+ else:
+ # table does not exist, create the table
create_table_input = _construct_table_input(
table_name=table_name,
- metadata_location=new_metadata_location,
- properties=updated_metadata.properties,
- metadata=updated_metadata,
+ metadata_location=updated_staged_table.metadata_location,
+ properties=updated_staged_table.properties,
+ metadata=updated_staged_table.metadata,
)
-
self._create_glue_table(database_name=database_name,
table_name=table_name, table_input=create_table_input)
- return CommitTableResponse(metadata=updated_metadata,
metadata_location=new_metadata_location)
+ return CommitTableResponse(
+ metadata=updated_staged_table.metadata,
metadata_location=updated_staged_table.metadata_location
+ )
def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Load the table's metadata and returns the table instance.
diff --git a/tests/catalog/integration_test_glue.py
b/tests/catalog/integration_test_glue.py
index 5cd60225..393271c6 100644
--- a/tests/catalog/integration_test_glue.py
+++ b/tests/catalog/integration_test_glue.py
@@ -462,7 +462,9 @@ def test_commit_table_update_schema(
]
-def test_commit_table_properties(test_catalog: Catalog, table_schema_nested:
Schema, database_name: str, table_name: str) -> None:
+def test_commit_table_properties(
+ test_catalog: Catalog, glue: boto3.client, table_schema_nested: Schema,
database_name: str, table_name: str
+) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier=identifier,
schema=table_schema_nested, properties={"test_a": "test_a"})
@@ -470,13 +472,19 @@ def test_commit_table_properties(test_catalog: Catalog,
table_schema_nested: Sch
assert MetastoreCatalog._parse_metadata_version(table.metadata_location)
== 0
transaction = table.transaction()
- transaction.set_properties(test_a="test_aa", test_b="test_b",
test_c="test_c")
+ transaction.set_properties(test_a="test_aa", test_b="test_b",
test_c="test_c", Description="test_description")
transaction.remove_properties("test_b")
transaction.commit_transaction()
updated_table_metadata = table.metadata
assert MetastoreCatalog._parse_metadata_version(table.metadata_location)
== 1
- assert updated_table_metadata.properties == {"test_a": "test_aa",
"test_c": "test_c"}
+ assert updated_table_metadata.properties == {'Description':
'test_description', "test_a": "test_aa", "test_c": "test_c"}
+
+ table_info = glue.get_table(
+ DatabaseName=database_name,
+ Name=table_name,
+ )
+ assert table_info["Table"]["Description"] == "test_description"
@pytest.mark.parametrize("format_version", [1, 2])
diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py
index 8aa49186..7b12261b 100644
--- a/tests/catalog/test_glue.py
+++ b/tests/catalog/test_glue.py
@@ -677,7 +677,12 @@ def test_commit_table_update_schema(
@mock_aws
def test_commit_table_properties(
- _bucket_initialize: None, moto_endpoint_url: str, table_schema_nested:
Schema, database_name: str, table_name: str
+ _glue: boto3.client,
+ _bucket_initialize: None,
+ moto_endpoint_url: str,
+ table_schema_nested: Schema,
+ database_name: str,
+ table_name: str,
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
@@ -688,13 +693,19 @@ def test_commit_table_properties(
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
transaction = table.transaction()
- transaction.set_properties(test_a="test_aa", test_b="test_b",
test_c="test_c")
+ transaction.set_properties(test_a="test_aa", test_b="test_b",
test_c="test_c", Description="test_description")
transaction.remove_properties("test_b")
transaction.commit_transaction()
updated_table_metadata = table.metadata
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
- assert updated_table_metadata.properties == {"test_a": "test_aa",
"test_c": "test_c"}
+ assert updated_table_metadata.properties == {'Description':
'test_description', "test_a": "test_aa", "test_c": "test_c"}
+
+ table_info = _glue.get_table(
+ DatabaseName=database_name,
+ Name=table_name,
+ )
+ assert table_info["Table"]["Description"] == "test_description"
@mock_aws