This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 6020f24a Add transaction tests to catalog integration tests (#2371)
6020f24a is described below
commit 6020f24a7989808d382ccc3864205f9f1272bdd7
Author: Gabriel Igliozzi <[email protected]>
AuthorDate: Thu Oct 9 15:06:54 2025 -0400
Add transaction tests to catalog integration tests (#2371)
Just adding more tests to the catalog tests.
---
tests/integration/test_catalog.py | 148 +++++++++++++++++++++++++++++++++++++-
1 file changed, 147 insertions(+), 1 deletion(-)
diff --git a/tests/integration/test_catalog.py
b/tests/integration/test_catalog.py
index b57ab983..b7d2cefa 100644
--- a/tests/integration/test_catalog.py
+++ b/tests/integration/test_catalog.py
@@ -27,6 +27,7 @@ from pyiceberg.catalog.memory import InMemoryCatalog
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import (
+ CommitFailedException,
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchNamespaceError,
@@ -34,7 +35,12 @@ from pyiceberg.exceptions import (
TableAlreadyExistsError,
)
from pyiceberg.io import WAREHOUSE
-from pyiceberg.schema import Schema
+from pyiceberg.partitioning import PartitionField, PartitionSpec
+from pyiceberg.schema import INITIAL_SCHEMA_ID, Schema
+from pyiceberg.table.metadata import INITIAL_SPEC_ID
+from pyiceberg.table.sorting import INITIAL_SORT_ORDER_ID, SortField, SortOrder
+from pyiceberg.transforms import DayTransform, IdentityTransform
+from pyiceberg.types import IntegerType, LongType, NestedField, TimestampType,
UUIDType
from tests.conftest import clean_up
@@ -259,6 +265,146 @@ def test_table_exists(test_catalog: Catalog,
table_schema_nested: Schema, databa
assert test_catalog.table_exists((database_name, table_name)) is True
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_update_table_transaction(test_catalog: Catalog, test_schema: Schema,
table_name: str, database_name: str) -> None:
+ identifier = (database_name, table_name)
+
+ test_catalog.create_namespace(database_name)
+ table = test_catalog.create_table(identifier, test_schema)
+ assert test_catalog.table_exists(identifier)
+
+ expected_schema = Schema(
+ NestedField(1, "VendorID", IntegerType(), False),
+ NestedField(2, "tpep_pickup_datetime", TimestampType(), False),
+ NestedField(3, "new_col", IntegerType(), False),
+ )
+
+ expected_spec = PartitionSpec(PartitionField(3, 1000, IdentityTransform(),
"new_col"))
+
+ with table.transaction() as transaction:
+ with transaction.update_schema() as update_schema:
+ update_schema.add_column("new_col", IntegerType())
+
+ with transaction.update_spec() as update_spec:
+ update_spec.add_field("new_col", IdentityTransform())
+
+ table = test_catalog.load_table(identifier)
+ assert table.schema().as_struct() == expected_schema.as_struct()
+ assert table.spec().fields == expected_spec.fields
+
+
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_update_schema_conflict(test_catalog: Catalog, test_schema: Schema,
table_name: str, database_name: str) -> None:
+ if isinstance(test_catalog, HiveCatalog):
+ pytest.skip("HiveCatalog fails in this test, need to investigate")
+
+ identifier = (database_name, table_name)
+
+ test_catalog.create_namespace(database_name)
+ table = test_catalog.create_table(identifier, test_schema)
+ assert test_catalog.table_exists(identifier)
+
+ original_update = table.update_schema().add_column("new_col", LongType())
+
+ # Update schema concurrently so that the original update fails
+ concurrent_update =
test_catalog.load_table(identifier).update_schema().delete_column("VendorID")
+ concurrent_update.commit()
+
+ expected_schema = Schema(NestedField(2, "tpep_pickup_datetime",
TimestampType(), False))
+
+ with pytest.raises(CommitFailedException):
+ original_update.commit()
+
+ table = test_catalog.load_table(identifier)
+ assert table.schema().as_struct() == expected_schema.as_struct()
+
+
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_create_table_transaction_simple(test_catalog: Catalog, test_schema:
Schema, table_name: str, database_name: str) -> None:
+ identifier = (database_name, table_name)
+
+ test_catalog.create_namespace(database_name)
+ table_transaction = test_catalog.create_table_transaction(identifier,
test_schema)
+ assert not test_catalog.table_exists(identifier)
+
+ table_transaction.update_schema().add_column("new_col",
IntegerType()).commit()
+ assert not test_catalog.table_exists(identifier)
+
+ table_transaction.commit_transaction()
+ assert test_catalog.table_exists(identifier)
+
+ table = test_catalog.load_table(identifier)
+ assert table.schema().find_type("new_col").is_primitive
+
+
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_create_table_transaction_multiple_schemas(
+ test_catalog: Catalog, test_schema: Schema, test_partition_spec:
PartitionSpec, table_name: str, database_name: str
+) -> None:
+ identifier = (database_name, table_name)
+
+ test_catalog.create_namespace(database_name)
+ table_transaction = test_catalog.create_table_transaction(
+ identifier=identifier,
+ schema=test_schema,
+ partition_spec=test_partition_spec,
+ sort_order=SortOrder(SortField(source_id=1)),
+ )
+ assert not test_catalog.table_exists(identifier)
+
+ table_transaction.update_schema().add_column("new_col",
IntegerType()).commit()
+ assert not test_catalog.table_exists(identifier)
+
+ table_transaction.update_schema().add_column("new_col_1",
UUIDType()).commit()
+ assert not test_catalog.table_exists(identifier)
+
+ table_transaction.update_spec().add_field("new_col",
IdentityTransform()).commit()
+ assert not test_catalog.table_exists(identifier)
+
+ # TODO: test replace sort order when available
+
+ expected_schema = Schema(
+ NestedField(1, "VendorID", IntegerType(), False),
+ NestedField(2, "tpep_pickup_datetime", TimestampType(), False),
+ NestedField(3, "new_col", IntegerType(), False),
+ NestedField(4, "new_col_1", UUIDType(), False),
+ )
+
+ expected_spec = PartitionSpec(
+ PartitionField(1, 1000, IdentityTransform(), "VendorID"),
+ PartitionField(2, 1001, DayTransform(), "tpep_pickup_day"),
+ PartitionField(3, 1002, IdentityTransform(), "new_col"),
+ )
+
+ table_transaction.commit_transaction()
+ assert test_catalog.table_exists(identifier)
+
+ table = test_catalog.load_table(identifier)
+ assert table.schema().as_struct() == expected_schema.as_struct()
+ assert table.schema().schema_id == INITIAL_SCHEMA_ID + 2
+ assert table.spec().fields == expected_spec.fields
+ assert table.spec().spec_id == INITIAL_SPEC_ID + 1
+ assert table.sort_order().order_id == INITIAL_SORT_ORDER_ID
+
+
[email protected]
[email protected]("test_catalog", CATALOGS)
+def test_concurrent_create_transaction(test_catalog: Catalog, test_schema:
Schema, table_name: str, database_name: str) -> None:
+ identifier = (database_name, table_name)
+
+ test_catalog.create_namespace(database_name)
+ table = test_catalog.create_table_transaction(identifier=identifier,
schema=test_schema)
+ assert not test_catalog.table_exists(identifier)
+
+ test_catalog.create_table(identifier, test_schema)
+ with pytest.raises(CommitFailedException):
+ table.commit_transaction()
+
+
@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_create_namespace(test_catalog: Catalog, database_name: str) -> None: