This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new ef33b9d Bug: Rest Catalog update partition-spec and sort-order when
schema is created (#392)
ef33b9d is described below
commit ef33b9db6e45a6f2388a1a3dfba7161df47c9747
Author: Sung Yun <[email protected]>
AuthorDate: Thu Feb 8 02:50:46 2024 -0500
Bug: Rest Catalog update partition-spec and sort-order when schema is
created (#392)
* also update partition spec and sort order when fresh schema is created
* fresh-schema
* create table integrity test
* undo test change
---
pyiceberg/catalog/rest.py | 14 ++++++++------
tests/integration/test_rest_schema.py | 32 ++++++++++++++++++++++++++++++++
2 files changed, 40 insertions(+), 6 deletions(-)
diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py
index 765f04b..b4b9f72 100644
--- a/pyiceberg/catalog/rest.py
+++ b/pyiceberg/catalog/rest.py
@@ -57,7 +57,7 @@ from pyiceberg.exceptions import (
TableAlreadyExistsError,
UnauthorizedError,
)
-from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
+from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC,
PartitionSpec, assign_fresh_partition_spec_ids
from pyiceberg.schema import Schema, assign_fresh_schema_ids
from pyiceberg.table import (
CommitTableRequest,
@@ -66,7 +66,7 @@ from pyiceberg.table import (
TableIdentifier,
TableMetadata,
)
-from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder,
assign_fresh_sort_order_ids
from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel
if TYPE_CHECKING:
@@ -448,15 +448,17 @@ class RestCatalog(Catalog):
properties: Properties = EMPTY_DICT,
) -> Table:
iceberg_schema = self._convert_schema_if_needed(schema)
- iceberg_schema = assign_fresh_schema_ids(iceberg_schema)
+ fresh_schema = assign_fresh_schema_ids(iceberg_schema)
+ fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec,
iceberg_schema, fresh_schema)
+ fresh_sort_order = assign_fresh_sort_order_ids(sort_order,
iceberg_schema, fresh_schema)
namespace_and_table = self._split_identifier_for_path(identifier)
request = CreateTableRequest(
name=namespace_and_table["table"],
location=location,
- table_schema=iceberg_schema,
- partition_spec=partition_spec,
- write_order=sort_order,
+ table_schema=fresh_schema,
+ partition_spec=fresh_partition_spec,
+ write_order=fresh_sort_order,
properties=properties,
)
serialized_json = request.model_dump_json().encode(UTF8)
diff --git a/tests/integration/test_rest_schema.py
b/tests/integration/test_rest_schema.py
index d844e6d..a3320e4 100644
--- a/tests/integration/test_rest_schema.py
+++ b/tests/integration/test_rest_schema.py
@@ -20,8 +20,11 @@ import pytest
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.exceptions import CommitFailedException, NoSuchTableError,
ValidationError
+from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema, prune_columns
from pyiceberg.table import Table, UpdateSchema
+from pyiceberg.table.sorting import SortField, SortOrder
+from pyiceberg.transforms import IdentityTransform
from pyiceberg.types import (
BinaryType,
BooleanType,
@@ -2497,3 +2500,32 @@ def
test_two_add_schemas_in_a_single_transaction(catalog: Catalog) -> None:
assert "Updates in a single commit need to be unique, duplicate: <class
'pyiceberg.table.AddSchemaUpdate'>" in str(
exc_info.value
)
+
+
[email protected]
+def test_create_table_integrity_after_fresh_assignment(catalog: Catalog) ->
None:
+ schema = Schema(
+ NestedField(field_id=5, name="col_uuid", field_type=UUIDType(),
required=False),
+ NestedField(field_id=4, name="col_fixed", field_type=FixedType(25),
required=False),
+ )
+ partition_spec = PartitionSpec(
+ PartitionField(source_id=5, field_id=1000,
transform=IdentityTransform(), name="col_uuid"), spec_id=0
+ )
+ sort_order = SortOrder(SortField(source_id=4,
transform=IdentityTransform()))
+ tbl_name = "default.test_create_integrity"
+ try:
+ catalog.drop_table(tbl_name)
+ except NoSuchTableError:
+ pass
+ tbl = catalog.create_table(identifier=tbl_name, schema=schema,
partition_spec=partition_spec, sort_order=sort_order)
+ expected_schema = Schema(
+ NestedField(field_id=1, name="col_uuid", field_type=UUIDType(),
required=False),
+ NestedField(field_id=2, name="col_fixed", field_type=FixedType(25),
required=False),
+ )
+ expected_spec = PartitionSpec(
+ PartitionField(source_id=1, field_id=1000,
transform=IdentityTransform(), name="col_uuid"), spec_id=0
+ )
+ expected_sort_order = SortOrder(SortField(source_id=2,
transform=IdentityTransform()))
+ assert tbl.schema() == expected_schema
+ assert tbl.spec() == expected_spec
+ assert tbl.sort_order() == expected_sort_order