This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 3bdd4587 Remove `initial_change` when CreateTableTransaction apply
table updates on an empty metadata (#1219)
3bdd4587 is described below
commit 3bdd458708f3ef41d466ab1f817788330573b4b8
Author: Honah J. <[email protected]>
AuthorDate: Fri Nov 1 11:41:58 2024 -0700
Remove `initial_change` when CreateTableTransaction apply table updates on
an empty metadata (#1219)
* make table metadata without validaiton
* update deletes test
* remove info
* add deprecation message
* revert lib version updates
* remove initial_changes usage in code
* move test to integration
* fix typo
* update error string
---
pyiceberg/catalog/__init__.py | 2 +-
pyiceberg/table/__init__.py | 10 ++---
pyiceberg/table/metadata.py | 16 ++++++++
pyiceberg/table/update/__init__.py | 39 ++++++++++++------
pyiceberg/utils/deprecated.py | 9 +++--
tests/integration/test_writes/test_writes.py | 59 +++++++++++++++++++++++++++-
6 files changed, 114 insertions(+), 21 deletions(-)
diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index 7eb8b02d..cc953a88 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -1011,4 +1011,4 @@ class MetastoreCatalog(Catalog, ABC):
Returns:
TableMetadata: An empty TableMetadata instance.
"""
- return TableMetadataV1(location="", last_column_id=-1, schema=Schema())
+ return TableMetadataV1.model_construct(last_column_id=-1,
schema=Schema())
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 264afd89..0163c425 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -703,22 +703,22 @@ class CreateTableTransaction(Transaction):
schema: Schema = table_metadata.schema()
self._updates += (
- AddSchemaUpdate(schema_=schema,
last_column_id=schema.highest_field_id, initial_change=True),
+ AddSchemaUpdate(schema_=schema,
last_column_id=schema.highest_field_id),
SetCurrentSchemaUpdate(schema_id=-1),
)
spec: PartitionSpec = table_metadata.spec()
if spec.is_unpartitioned():
- self._updates +=
(AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC,
initial_change=True),)
+ self._updates +=
(AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC),)
else:
- self._updates += (AddPartitionSpecUpdate(spec=spec,
initial_change=True),)
+ self._updates += (AddPartitionSpecUpdate(spec=spec),)
self._updates += (SetDefaultSpecUpdate(spec_id=-1),)
sort_order: Optional[SortOrder] =
table_metadata.sort_order_by_id(table_metadata.default_sort_order_id)
if sort_order is None or sort_order.is_unsorted:
- self._updates +=
(AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER, initial_change=True),)
+ self._updates +=
(AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER),)
else:
- self._updates += (AddSortOrderUpdate(sort_order=sort_order,
initial_change=True),)
+ self._updates += (AddSortOrderUpdate(sort_order=sort_order),)
self._updates += (SetDefaultSortOrderUpdate(sort_order_id=-1),)
self._updates += (
diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py
index 1fea3301..8173bb2c 100644
--- a/pyiceberg/table/metadata.py
+++ b/pyiceberg/table/metadata.py
@@ -587,5 +587,21 @@ class TableMetadataUtil:
else:
raise ValidationError(f"Unknown format version: {format_version}")
+ @staticmethod
+ def _construct_without_validation(table_metadata: TableMetadata) ->
TableMetadata:
+ """Construct table metadata from an existing table without performing
validation.
+
+ This method is useful during a sequence of table updates when the
model needs to be re-constructed but is not yet ready for validation.
+ """
+ if table_metadata.format_version is None:
+ raise ValidationError(f"Missing format-version in TableMetadata:
{table_metadata}")
+
+ if table_metadata.format_version == 1:
+ return TableMetadataV1.model_construct(**dict(table_metadata))
+ elif table_metadata.format_version == 2:
+ return TableMetadataV2.model_construct(**dict(table_metadata))
+ else:
+ raise ValidationError(f"Unknown format version:
{table_metadata.format_version}")
+
TableMetadata = Annotated[Union[TableMetadataV1, TableMetadataV2],
Field(discriminator="format_version")] # type: ignore
diff --git a/pyiceberg/table/update/__init__.py
b/pyiceberg/table/update/__init__.py
index 6e14046f..b81a2bf7 100644
--- a/pyiceberg/table/update/__init__.py
+++ b/pyiceberg/table/update/__init__.py
@@ -18,7 +18,6 @@ from __future__ import annotations
import uuid
from abc import ABC, abstractmethod
-from copy import copy
from datetime import datetime
from functools import singledispatch
from typing import TYPE_CHECKING, Any, Dict, Generic, List, Literal, Optional,
Tuple, TypeVar, Union
@@ -45,6 +44,7 @@ from pyiceberg.types import (
transform_dict_value_to_str,
)
from pyiceberg.utils.datetime import datetime_to_millis
+from pyiceberg.utils.deprecated import deprecation_notice
from pyiceberg.utils.properties import property_as_int
if TYPE_CHECKING:
@@ -90,7 +90,13 @@ class AddSchemaUpdate(IcebergBaseModel):
# This field is required: https://github.com/apache/iceberg/pull/7445
last_column_id: int = Field(alias="last-column-id")
- initial_change: bool = Field(default=False, exclude=True)
+ initial_change: bool = Field(
+ default=False,
+ exclude=True,
+ deprecated=deprecation_notice(
+ deprecated_in="0.8.0", removed_in="0.9.0",
help_message="CreateTableTransaction can work without this field"
+ ),
+ )
class SetCurrentSchemaUpdate(IcebergBaseModel):
@@ -104,7 +110,13 @@ class AddPartitionSpecUpdate(IcebergBaseModel):
action: Literal["add-spec"] = Field(default="add-spec")
spec: PartitionSpec
- initial_change: bool = Field(default=False, exclude=True)
+ initial_change: bool = Field(
+ default=False,
+ exclude=True,
+ deprecated=deprecation_notice(
+ deprecated_in="0.8.0", removed_in="0.9.0",
help_message="CreateTableTransaction can work without this field"
+ ),
+ )
class SetDefaultSpecUpdate(IcebergBaseModel):
@@ -118,7 +130,13 @@ class AddSortOrderUpdate(IcebergBaseModel):
action: Literal["add-sort-order"] = Field(default="add-sort-order")
sort_order: SortOrder = Field(alias="sort-order")
- initial_change: bool = Field(default=False, exclude=True)
+ initial_change: bool = Field(
+ default=False,
+ exclude=True,
+ deprecated=deprecation_notice(
+ deprecated_in="0.8.0", removed_in="0.9.0",
help_message="CreateTableTransaction can work without this field"
+ ),
+ )
class SetDefaultSortOrderUpdate(IcebergBaseModel):
@@ -267,11 +285,10 @@ def _(
elif update.format_version == base_metadata.format_version:
return base_metadata
- updated_metadata_data = copy(base_metadata.model_dump())
- updated_metadata_data["format-version"] = update.format_version
+ updated_metadata = base_metadata.model_copy(update={"format_version":
update.format_version})
context.add_update(update)
- return TableMetadataUtil.parse_obj(updated_metadata_data)
+ return TableMetadataUtil._construct_without_validation(updated_metadata)
@_apply_table_update.register(SetPropertiesUpdate)
@@ -306,7 +323,7 @@ def _(update: AddSchemaUpdate, base_metadata:
TableMetadata, context: _TableMeta
metadata_updates: Dict[str, Any] = {
"last_column_id": update.last_column_id,
- "schemas": [update.schema_] if update.initial_change else
base_metadata.schemas + [update.schema_],
+ "schemas": base_metadata.schemas + [update.schema_],
}
context.add_update(update)
@@ -336,11 +353,11 @@ def _(update: SetCurrentSchemaUpdate, base_metadata:
TableMetadata, context: _Ta
@_apply_table_update.register(AddPartitionSpecUpdate)
def _(update: AddPartitionSpecUpdate, base_metadata: TableMetadata, context:
_TableMetadataUpdateContext) -> TableMetadata:
for spec in base_metadata.partition_specs:
- if spec.spec_id == update.spec.spec_id and not update.initial_change:
+ if spec.spec_id == update.spec.spec_id:
raise ValueError(f"Partition spec with id {spec.spec_id} already
exists: {spec}")
metadata_updates: Dict[str, Any] = {
- "partition_specs": [update.spec] if update.initial_change else
base_metadata.partition_specs + [update.spec],
+ "partition_specs": base_metadata.partition_specs + [update.spec],
"last_partition_id": max(
max([field.field_id for field in update.spec.fields], default=0),
base_metadata.last_partition_id or PARTITION_FIELD_ID_START - 1,
@@ -448,7 +465,7 @@ def _(update: AddSortOrderUpdate, base_metadata:
TableMetadata, context: _TableM
context.add_update(update)
return base_metadata.model_copy(
update={
- "sort_orders": [update.sort_order] if update.initial_change else
base_metadata.sort_orders + [update.sort_order],
+ "sort_orders": base_metadata.sort_orders + [update.sort_order],
}
)
diff --git a/pyiceberg/utils/deprecated.py b/pyiceberg/utils/deprecated.py
index 92051b4f..188d0ce6 100644
--- a/pyiceberg/utils/deprecated.py
+++ b/pyiceberg/utils/deprecated.py
@@ -41,14 +41,17 @@ def deprecated(deprecated_in: str, removed_in: str,
help_message: Optional[str]
return decorator
+def deprecation_notice(deprecated_in: str, removed_in: str, help_message:
Optional[str]) -> str:
+ """Return a deprecation notice."""
+ return f"Deprecated in {deprecated_in}, will be removed in {removed_in}.
{help_message}"
+
+
def deprecation_message(deprecated_in: str, removed_in: str, help_message:
Optional[str]) -> None:
"""Mark properties or behaviors as deprecated.
Adding this will result in a warning being emitted.
"""
- message = f"Deprecated in {deprecated_in}, will be removed in
{removed_in}. {help_message}"
-
- _deprecation_warning(message)
+ _deprecation_warning(deprecation_notice(deprecated_in, removed_in,
help_message))
def _deprecation_warning(message: str) -> None:
diff --git a/tests/integration/test_writes/test_writes.py
b/tests/integration/test_writes/test_writes.py
index 01744514..e0b788e8 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -45,6 +45,7 @@ from pyiceberg.io.pyarrow import _dataframe_to_data_files
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
+from pyiceberg.table.sorting import SortDirection, SortField, SortOrder
from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform
from pyiceberg.types import (
DateType,
@@ -738,7 +739,7 @@ def test_write_and_evolve(session_catalog: Catalog,
format_version: int) -> None
def test_create_table_transaction(catalog: Catalog, format_version: int) ->
None:
if format_version == 1 and isinstance(catalog, RestCatalog):
pytest.skip(
- "There is a bug in the REST catalog (maybe server side) that
prevents create and commit a staged version 1 table"
+ "There is a bug in the REST catalog image
(https://github.com/apache/iceberg/issues/8756) that prevents create and commit
a staged version 1 table"
)
identifier =
f"default.arrow_create_table_transaction_{catalog.name}_{format_version}"
@@ -787,6 +788,62 @@ def test_create_table_transaction(catalog: Catalog,
format_version: int) -> None
assert len(tbl.scan().to_arrow()) == 6
[email protected]
[email protected]("format_version", [1, 2])
[email protected]("catalog",
[pytest.lazy_fixture("session_catalog_hive"),
pytest.lazy_fixture("session_catalog")])
+def test_create_table_with_non_default_values(catalog: Catalog,
table_schema_with_all_types: Schema, format_version: int) -> None:
+ if format_version == 1 and isinstance(catalog, RestCatalog):
+ pytest.skip(
+ "There is a bug in the REST catalog image
(https://github.com/apache/iceberg/issues/8756) that prevents create and commit
a staged version 1 table"
+ )
+
+ identifier =
f"default.arrow_create_table_transaction_with_non_default_values_{catalog.name}_{format_version}"
+ identifier_ref =
f"default.arrow_create_table_transaction_with_non_default_values_ref_{catalog.name}_{format_version}"
+
+ try:
+ catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ try:
+ catalog.drop_table(identifier=identifier_ref)
+ except NoSuchTableError:
+ pass
+
+ iceberg_spec = PartitionSpec(*[
+ PartitionField(source_id=2, field_id=1001,
transform=IdentityTransform(), name="integer_partition")
+ ])
+
+ sort_order = SortOrder(*[SortField(source_id=2,
transform=IdentityTransform(), direction=SortDirection.ASC)])
+
+ txn = catalog.create_table_transaction(
+ identifier=identifier,
+ schema=table_schema_with_all_types,
+ partition_spec=iceberg_spec,
+ sort_order=sort_order,
+ properties={"format-version": format_version},
+ )
+ txn.commit_transaction()
+
+ tbl = catalog.load_table(identifier)
+
+ tbl_ref = catalog.create_table(
+ identifier=identifier_ref,
+ schema=table_schema_with_all_types,
+ partition_spec=iceberg_spec,
+ sort_order=sort_order,
+ properties={"format-version": format_version},
+ )
+
+ assert tbl.format_version == tbl_ref.format_version
+ assert tbl.schema() == tbl_ref.schema()
+ assert tbl.schemas() == tbl_ref.schemas()
+ assert tbl.spec() == tbl_ref.spec()
+ assert tbl.specs() == tbl_ref.specs()
+ assert tbl.sort_order() == tbl_ref.sort_order()
+ assert tbl.sort_orders() == tbl_ref.sort_orders()
+
+
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_table_properties_int_value(