This is an automated email from the ASF dual-hosted git repository.
honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 3085c40 Table Metadata Update: Support SetPropertiesUpdate and
RemovePropertiesUpdate (#266)
3085c40 is described below
commit 3085c404c99ba8c5c8856f21a6c8d63a12ca0113
Author: Honah J <[email protected]>
AuthorDate: Tue Jan 16 10:06:20 2024 -0800
Table Metadata Update: Support SetPropertiesUpdate and
RemovePropertiesUpdate (#266)
* Support table properties update
* Add test for glue catalog
---
pyiceberg/table/__init__.py | 25 ++++++++++++++++
tests/catalog/integration_test_glue.py | 17 +++++++++++
tests/catalog/test_glue.py | 22 ++++++++++++++
tests/table/test_init.py | 53 ++++++++++++++++++++++++++++++++++
4 files changed, 117 insertions(+)
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index ebeaa19..5292c51 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -415,6 +415,31 @@ def _(update: UpgradeFormatVersionUpdate, base_metadata:
TableMetadata, context:
return TableMetadataUtil.parse_obj(updated_metadata_data)
+@_apply_table_update.register(SetPropertiesUpdate)
+def _(update: SetPropertiesUpdate, base_metadata: TableMetadata, context:
_TableMetadataUpdateContext) -> TableMetadata:
+ if len(update.updates) == 0:
+ return base_metadata
+
+ properties = dict(base_metadata.properties)
+ properties.update(update.updates)
+
+ context.add_update(update)
+ return base_metadata.model_copy(update={"properties": properties})
+
+
+@_apply_table_update.register(RemovePropertiesUpdate)
+def _(update: RemovePropertiesUpdate, base_metadata: TableMetadata, context:
_TableMetadataUpdateContext) -> TableMetadata:
+ if len(update.removals) == 0:
+ return base_metadata
+
+ properties = dict(base_metadata.properties)
+ for key in update.removals:
+ properties.pop(key)
+
+ context.add_update(update)
+ return base_metadata.model_copy(update={"properties": properties})
+
+
@_apply_table_update.register(AddSchemaUpdate)
def _(update: AddSchemaUpdate, base_metadata: TableMetadata, context:
_TableMetadataUpdateContext) -> TableMetadata:
if update.last_column_id < base_metadata.last_column_id:
diff --git a/tests/catalog/integration_test_glue.py
b/tests/catalog/integration_test_glue.py
index 99f0ada..24401ca 100644
--- a/tests/catalog/integration_test_glue.py
+++ b/tests/catalog/integration_test_glue.py
@@ -294,3 +294,20 @@ def test_commit_table_update_schema(
assert new_schema
assert new_schema == update._apply()
assert new_schema.find_field("b").field_type == IntegerType()
+
+
+def test_commit_table_properties(test_catalog: Catalog, table_schema_nested:
Schema, database_name: str, table_name: str) -> None:
+ identifier = (database_name, table_name)
+ test_catalog.create_namespace(namespace=database_name)
+ table = test_catalog.create_table(identifier=identifier,
schema=table_schema_nested, properties={"test_a": "test_a"})
+
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 0
+
+ transaction = table.transaction()
+ transaction.set_properties(test_a="test_aa", test_b="test_b",
test_c="test_c")
+ transaction.remove_properties("test_b")
+ transaction.commit_transaction()
+
+ updated_table_metadata = table.metadata
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 1
+ assert updated_table_metadata.properties == {"test_a": "test_aa",
"test_c": "test_c"}
diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py
index f84ed4a..bf6d117 100644
--- a/tests/catalog/test_glue.py
+++ b/tests/catalog/test_glue.py
@@ -553,3 +553,25 @@ def test_commit_table_update_schema(
assert new_schema
assert new_schema == update._apply()
assert new_schema.find_field("b").field_type == IntegerType()
+
+
+@mock_glue
+def test_commit_table_properties(
+ _bucket_initialize: None, moto_endpoint_url: str, table_schema_nested:
Schema, database_name: str, table_name: str
+) -> None:
+ catalog_name = "glue"
+ identifier = (database_name, table_name)
+ test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint":
moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
+ test_catalog.create_namespace(namespace=database_name)
+ table = test_catalog.create_table(identifier=identifier,
schema=table_schema_nested, properties={"test_a": "test_a"})
+
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 0
+
+ transaction = table.transaction()
+ transaction.set_properties(test_a="test_aa", test_b="test_b",
test_c="test_c")
+ transaction.remove_properties("test_b")
+ transaction.commit_transaction()
+
+ updated_table_metadata = table.metadata
+ assert test_catalog._parse_metadata_version(table.metadata_location) == 1
+ assert updated_table_metadata.properties == {"test_a": "test_aa",
"test_c": "test_c"}
diff --git a/tests/table/test_init.py b/tests/table/test_init.py
index 475814a..d3bbe41 100644
--- a/tests/table/test_init.py
+++ b/tests/table/test_init.py
@@ -50,6 +50,7 @@ from pyiceberg.table import (
AssertLastAssignedPartitionId,
AssertRefSnapshotId,
AssertTableUUID,
+ RemovePropertiesUpdate,
SetPropertiesUpdate,
SetSnapshotRefUpdate,
SnapshotRef,
@@ -529,6 +530,51 @@ def test_add_nested_list_type_column(table_v2: Table) ->
None:
assert new_schema.highest_field_id == 7
+def test_apply_set_properties_update(table_v2: Table) -> None:
+ base_metadata = table_v2.metadata
+
+ new_metadata_no_update = update_table_metadata(base_metadata,
(SetPropertiesUpdate(updates={}),))
+ assert new_metadata_no_update == base_metadata
+
+ new_metadata = update_table_metadata(
+ base_metadata, (SetPropertiesUpdate(updates={"read.split.target.size":
"123", "test_a": "test_a", "test_b": "test_b"}),)
+ )
+
+ assert base_metadata.properties == {"read.split.target.size": "134217728"}
+ assert new_metadata.properties == {"read.split.target.size": "123",
"test_a": "test_a", "test_b": "test_b"}
+
+ new_metadata_add_only = update_table_metadata(new_metadata,
(SetPropertiesUpdate(updates={"test_c": "test_c"}),))
+
+ assert new_metadata_add_only.properties == {
+ "read.split.target.size": "123",
+ "test_a": "test_a",
+ "test_b": "test_b",
+ "test_c": "test_c",
+ }
+
+
+def test_apply_remove_properties_update(table_v2: Table) -> None:
+ base_metadata = update_table_metadata(
+ table_v2.metadata,
+ (SetPropertiesUpdate(updates={"test_a": "test_a", "test_b": "test_b",
"test_c": "test_c", "test_d": "test_d"}),),
+ )
+
+ new_metadata_no_removal = update_table_metadata(base_metadata,
(RemovePropertiesUpdate(removals=[]),))
+
+ assert base_metadata == new_metadata_no_removal
+
+ new_metadata = update_table_metadata(base_metadata,
(RemovePropertiesUpdate(removals=["test_a", "test_c"]),))
+
+ assert base_metadata.properties == {
+ "read.split.target.size": "134217728",
+ "test_a": "test_a",
+ "test_b": "test_b",
+ "test_c": "test_c",
+ "test_d": "test_d",
+ }
+ assert new_metadata.properties == {"read.split.target.size": "134217728",
"test_b": "test_b", "test_d": "test_d"}
+
+
def test_apply_add_schema_update(table_v2: Table) -> None:
transaction = table_v2.transaction()
update = transaction.update_schema()
@@ -625,6 +671,8 @@ def test_update_metadata_with_multiple_updates(table_v1:
Table) -> None:
schema_update_1.add_column(path="b", field_type=IntegerType())
schema_update_1.commit()
+ transaction.set_properties(owner="test", test_a="test_a", test_b="test_b",
test_c="test_c")
+
test_updates = transaction._updates # pylint: disable=W0212
new_snapshot = Snapshot(
@@ -639,6 +687,7 @@ def test_update_metadata_with_multiple_updates(table_v1:
Table) -> None:
test_updates += (
AddSnapshotUpdate(snapshot=new_snapshot),
+ SetPropertiesUpdate(updates={"test_a": "test_a1"}),
SetSnapshotRefUpdate(
ref_name="main",
type="branch",
@@ -647,6 +696,7 @@ def test_update_metadata_with_multiple_updates(table_v1:
Table) -> None:
max_snapshot_age_ms=12312312312,
min_snapshots_to_keep=1,
),
+ RemovePropertiesUpdate(removals=["test_c", "test_b"]),
)
new_metadata = update_table_metadata(base_metadata, test_updates)
@@ -681,6 +731,9 @@ def test_update_metadata_with_multiple_updates(table_v1:
Table) -> None:
max_ref_age_ms=123123123,
)
+ # Set/RemovePropertiesUpdate
+ assert new_metadata.properties == {"owner": "test", "test_a": "test_a1"}
+
def test_metadata_isolation_from_illegal_updates(table_v1: Table) -> None:
base_metadata = table_v1.metadata