This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bdd4587 Remove `initial_change` when CreateTableTransaction apply 
table updates on an empty metadata (#1219)
3bdd4587 is described below

commit 3bdd458708f3ef41d466ab1f817788330573b4b8
Author: Honah J. <[email protected]>
AuthorDate: Fri Nov 1 11:41:58 2024 -0700

    Remove `initial_change` when CreateTableTransaction apply table updates on 
an empty metadata (#1219)
    
    * make table metadata without validaiton
    
    * update deletes test
    
    * remove info
    
    * add deprecation message
    
    * revert lib version updates
    
    * remove initial_changes usage in code
    
    * move test to integration
    
    * fix typo
    
    * update error string
---
 pyiceberg/catalog/__init__.py                |  2 +-
 pyiceberg/table/__init__.py                  | 10 ++---
 pyiceberg/table/metadata.py                  | 16 ++++++++
 pyiceberg/table/update/__init__.py           | 39 ++++++++++++------
 pyiceberg/utils/deprecated.py                |  9 +++--
 tests/integration/test_writes/test_writes.py | 59 +++++++++++++++++++++++++++-
 6 files changed, 114 insertions(+), 21 deletions(-)

diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index 7eb8b02d..cc953a88 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -1011,4 +1011,4 @@ class MetastoreCatalog(Catalog, ABC):
         Returns:
             TableMetadata: An empty TableMetadata instance.
         """
-        return TableMetadataV1(location="", last_column_id=-1, schema=Schema())
+        return TableMetadataV1.model_construct(last_column_id=-1, 
schema=Schema())
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 264afd89..0163c425 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -703,22 +703,22 @@ class CreateTableTransaction(Transaction):
 
         schema: Schema = table_metadata.schema()
         self._updates += (
-            AddSchemaUpdate(schema_=schema, 
last_column_id=schema.highest_field_id, initial_change=True),
+            AddSchemaUpdate(schema_=schema, 
last_column_id=schema.highest_field_id),
             SetCurrentSchemaUpdate(schema_id=-1),
         )
 
         spec: PartitionSpec = table_metadata.spec()
         if spec.is_unpartitioned():
-            self._updates += 
(AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC, 
initial_change=True),)
+            self._updates += 
(AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC),)
         else:
-            self._updates += (AddPartitionSpecUpdate(spec=spec, 
initial_change=True),)
+            self._updates += (AddPartitionSpecUpdate(spec=spec),)
         self._updates += (SetDefaultSpecUpdate(spec_id=-1),)
 
         sort_order: Optional[SortOrder] = 
table_metadata.sort_order_by_id(table_metadata.default_sort_order_id)
         if sort_order is None or sort_order.is_unsorted:
-            self._updates += 
(AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER, initial_change=True),)
+            self._updates += 
(AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER),)
         else:
-            self._updates += (AddSortOrderUpdate(sort_order=sort_order, 
initial_change=True),)
+            self._updates += (AddSortOrderUpdate(sort_order=sort_order),)
         self._updates += (SetDefaultSortOrderUpdate(sort_order_id=-1),)
 
         self._updates += (
diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py
index 1fea3301..8173bb2c 100644
--- a/pyiceberg/table/metadata.py
+++ b/pyiceberg/table/metadata.py
@@ -587,5 +587,21 @@ class TableMetadataUtil:
         else:
             raise ValidationError(f"Unknown format version: {format_version}")
 
+    @staticmethod
+    def _construct_without_validation(table_metadata: TableMetadata) -> 
TableMetadata:
+        """Construct table metadata from an existing table without performing 
validation.
+
+        This method is useful during a sequence of table updates when the 
model needs to be re-constructed but is not yet ready for validation.
+        """
+        if table_metadata.format_version is None:
+            raise ValidationError(f"Missing format-version in TableMetadata: 
{table_metadata}")
+
+        if table_metadata.format_version == 1:
+            return TableMetadataV1.model_construct(**dict(table_metadata))
+        elif table_metadata.format_version == 2:
+            return TableMetadataV2.model_construct(**dict(table_metadata))
+        else:
+            raise ValidationError(f"Unknown format version: 
{table_metadata.format_version}")
+
 
 TableMetadata = Annotated[Union[TableMetadataV1, TableMetadataV2], 
Field(discriminator="format_version")]  # type: ignore
diff --git a/pyiceberg/table/update/__init__.py 
b/pyiceberg/table/update/__init__.py
index 6e14046f..b81a2bf7 100644
--- a/pyiceberg/table/update/__init__.py
+++ b/pyiceberg/table/update/__init__.py
@@ -18,7 +18,6 @@ from __future__ import annotations
 
 import uuid
 from abc import ABC, abstractmethod
-from copy import copy
 from datetime import datetime
 from functools import singledispatch
 from typing import TYPE_CHECKING, Any, Dict, Generic, List, Literal, Optional, 
Tuple, TypeVar, Union
@@ -45,6 +44,7 @@ from pyiceberg.types import (
     transform_dict_value_to_str,
 )
 from pyiceberg.utils.datetime import datetime_to_millis
+from pyiceberg.utils.deprecated import deprecation_notice
 from pyiceberg.utils.properties import property_as_int
 
 if TYPE_CHECKING:
@@ -90,7 +90,13 @@ class AddSchemaUpdate(IcebergBaseModel):
     # This field is required: https://github.com/apache/iceberg/pull/7445
     last_column_id: int = Field(alias="last-column-id")
 
-    initial_change: bool = Field(default=False, exclude=True)
+    initial_change: bool = Field(
+        default=False,
+        exclude=True,
+        deprecated=deprecation_notice(
+            deprecated_in="0.8.0", removed_in="0.9.0", 
help_message="CreateTableTransaction can work without this field"
+        ),
+    )
 
 
 class SetCurrentSchemaUpdate(IcebergBaseModel):
@@ -104,7 +110,13 @@ class AddPartitionSpecUpdate(IcebergBaseModel):
     action: Literal["add-spec"] = Field(default="add-spec")
     spec: PartitionSpec
 
-    initial_change: bool = Field(default=False, exclude=True)
+    initial_change: bool = Field(
+        default=False,
+        exclude=True,
+        deprecated=deprecation_notice(
+            deprecated_in="0.8.0", removed_in="0.9.0", 
help_message="CreateTableTransaction can work without this field"
+        ),
+    )
 
 
 class SetDefaultSpecUpdate(IcebergBaseModel):
@@ -118,7 +130,13 @@ class AddSortOrderUpdate(IcebergBaseModel):
     action: Literal["add-sort-order"] = Field(default="add-sort-order")
     sort_order: SortOrder = Field(alias="sort-order")
 
-    initial_change: bool = Field(default=False, exclude=True)
+    initial_change: bool = Field(
+        default=False,
+        exclude=True,
+        deprecated=deprecation_notice(
+            deprecated_in="0.8.0", removed_in="0.9.0", 
help_message="CreateTableTransaction can work without this field"
+        ),
+    )
 
 
 class SetDefaultSortOrderUpdate(IcebergBaseModel):
@@ -267,11 +285,10 @@ def _(
     elif update.format_version == base_metadata.format_version:
         return base_metadata
 
-    updated_metadata_data = copy(base_metadata.model_dump())
-    updated_metadata_data["format-version"] = update.format_version
+    updated_metadata = base_metadata.model_copy(update={"format_version": 
update.format_version})
 
     context.add_update(update)
-    return TableMetadataUtil.parse_obj(updated_metadata_data)
+    return TableMetadataUtil._construct_without_validation(updated_metadata)
 
 
 @_apply_table_update.register(SetPropertiesUpdate)
@@ -306,7 +323,7 @@ def _(update: AddSchemaUpdate, base_metadata: 
TableMetadata, context: _TableMeta
 
     metadata_updates: Dict[str, Any] = {
         "last_column_id": update.last_column_id,
-        "schemas": [update.schema_] if update.initial_change else 
base_metadata.schemas + [update.schema_],
+        "schemas": base_metadata.schemas + [update.schema_],
     }
 
     context.add_update(update)
@@ -336,11 +353,11 @@ def _(update: SetCurrentSchemaUpdate, base_metadata: 
TableMetadata, context: _Ta
 @_apply_table_update.register(AddPartitionSpecUpdate)
 def _(update: AddPartitionSpecUpdate, base_metadata: TableMetadata, context: 
_TableMetadataUpdateContext) -> TableMetadata:
     for spec in base_metadata.partition_specs:
-        if spec.spec_id == update.spec.spec_id and not update.initial_change:
+        if spec.spec_id == update.spec.spec_id:
             raise ValueError(f"Partition spec with id {spec.spec_id} already 
exists: {spec}")
 
     metadata_updates: Dict[str, Any] = {
-        "partition_specs": [update.spec] if update.initial_change else 
base_metadata.partition_specs + [update.spec],
+        "partition_specs": base_metadata.partition_specs + [update.spec],
         "last_partition_id": max(
             max([field.field_id for field in update.spec.fields], default=0),
             base_metadata.last_partition_id or PARTITION_FIELD_ID_START - 1,
@@ -448,7 +465,7 @@ def _(update: AddSortOrderUpdate, base_metadata: 
TableMetadata, context: _TableM
     context.add_update(update)
     return base_metadata.model_copy(
         update={
-            "sort_orders": [update.sort_order] if update.initial_change else 
base_metadata.sort_orders + [update.sort_order],
+            "sort_orders": base_metadata.sort_orders + [update.sort_order],
         }
     )
 
diff --git a/pyiceberg/utils/deprecated.py b/pyiceberg/utils/deprecated.py
index 92051b4f..188d0ce6 100644
--- a/pyiceberg/utils/deprecated.py
+++ b/pyiceberg/utils/deprecated.py
@@ -41,14 +41,17 @@ def deprecated(deprecated_in: str, removed_in: str, 
help_message: Optional[str]
     return decorator
 
 
+def deprecation_notice(deprecated_in: str, removed_in: str, help_message: 
Optional[str]) -> str:
+    """Return a deprecation notice."""
+    return f"Deprecated in {deprecated_in}, will be removed in {removed_in}. 
{help_message}"
+
+
 def deprecation_message(deprecated_in: str, removed_in: str, help_message: 
Optional[str]) -> None:
     """Mark properties or behaviors as deprecated.
 
     Adding this will result in a warning being emitted.
     """
-    message = f"Deprecated in {deprecated_in}, will be removed in 
{removed_in}. {help_message}"
-
-    _deprecation_warning(message)
+    _deprecation_warning(deprecation_notice(deprecated_in, removed_in, 
help_message))
 
 
 def _deprecation_warning(message: str) -> None:
diff --git a/tests/integration/test_writes/test_writes.py 
b/tests/integration/test_writes/test_writes.py
index 01744514..e0b788e8 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -45,6 +45,7 @@ from pyiceberg.io.pyarrow import _dataframe_to_data_files
 from pyiceberg.partitioning import PartitionField, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.table import TableProperties
+from pyiceberg.table.sorting import SortDirection, SortField, SortOrder
 from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform
 from pyiceberg.types import (
     DateType,
@@ -738,7 +739,7 @@ def test_write_and_evolve(session_catalog: Catalog, 
format_version: int) -> None
 def test_create_table_transaction(catalog: Catalog, format_version: int) -> 
None:
     if format_version == 1 and isinstance(catalog, RestCatalog):
         pytest.skip(
-            "There is a bug in the REST catalog (maybe server side) that 
prevents create and commit a staged version 1 table"
+            "There is a bug in the REST catalog image 
(https://github.com/apache/iceberg/issues/8756) that prevents create and commit 
a staged version 1 table"
         )
 
     identifier = 
f"default.arrow_create_table_transaction_{catalog.name}_{format_version}"
@@ -787,6 +788,62 @@ def test_create_table_transaction(catalog: Catalog, 
format_version: int) -> None
     assert len(tbl.scan().to_arrow()) == 6
 
 
[email protected]
[email protected]("format_version", [1, 2])
[email protected]("catalog", 
[pytest.lazy_fixture("session_catalog_hive"), 
pytest.lazy_fixture("session_catalog")])
+def test_create_table_with_non_default_values(catalog: Catalog, 
table_schema_with_all_types: Schema, format_version: int) -> None:
+    if format_version == 1 and isinstance(catalog, RestCatalog):
+        pytest.skip(
+            "There is a bug in the REST catalog image 
(https://github.com/apache/iceberg/issues/8756) that prevents create and commit 
a staged version 1 table"
+        )
+
+    identifier = 
f"default.arrow_create_table_transaction_with_non_default_values_{catalog.name}_{format_version}"
+    identifier_ref = 
f"default.arrow_create_table_transaction_with_non_default_values_ref_{catalog.name}_{format_version}"
+
+    try:
+        catalog.drop_table(identifier=identifier)
+    except NoSuchTableError:
+        pass
+
+    try:
+        catalog.drop_table(identifier=identifier_ref)
+    except NoSuchTableError:
+        pass
+
+    iceberg_spec = PartitionSpec(*[
+        PartitionField(source_id=2, field_id=1001, 
transform=IdentityTransform(), name="integer_partition")
+    ])
+
+    sort_order = SortOrder(*[SortField(source_id=2, 
transform=IdentityTransform(), direction=SortDirection.ASC)])
+
+    txn = catalog.create_table_transaction(
+        identifier=identifier,
+        schema=table_schema_with_all_types,
+        partition_spec=iceberg_spec,
+        sort_order=sort_order,
+        properties={"format-version": format_version},
+    )
+    txn.commit_transaction()
+
+    tbl = catalog.load_table(identifier)
+
+    tbl_ref = catalog.create_table(
+        identifier=identifier_ref,
+        schema=table_schema_with_all_types,
+        partition_spec=iceberg_spec,
+        sort_order=sort_order,
+        properties={"format-version": format_version},
+    )
+
+    assert tbl.format_version == tbl_ref.format_version
+    assert tbl.schema() == tbl_ref.schema()
+    assert tbl.schemas() == tbl_ref.schemas()
+    assert tbl.spec() == tbl_ref.spec()
+    assert tbl.specs() == tbl_ref.specs()
+    assert tbl.sort_order() == tbl_ref.sort_order()
+    assert tbl.sort_orders() == tbl_ref.sort_orders()
+
+
 @pytest.mark.integration
 @pytest.mark.parametrize("format_version", [1, 2])
 def test_table_properties_int_value(

Reply via email to