This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch pyiceberg-0.6.x
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/pyiceberg-0.6.x by this push:
     new f0bb2030 [Bug Fix] cast None current-snapshot-id as -1 for Backwards 
Compatibility (#473) (#557)
f0bb2030 is described below

commit f0bb203095fb463f4be1d050d4c1594f642d56d1
Author: Honah J <[email protected]>
AuthorDate: Thu Mar 28 23:44:27 2024 -0700

    [Bug Fix] cast None current-snapshot-id as -1 for Backwards Compatibility 
(#473) (#557)
    
    Backport to 0.6.1
    
    Co-authored-by: Sung Yun <[email protected]>
---
 mkdocs/docs/configuration.md  |  4 ++++
 pyiceberg/serializers.py      |  6 +++++-
 pyiceberg/table/metadata.py   |  9 +++++++-
 pyiceberg/utils/concurrent.py | 11 +---------
 pyiceberg/utils/config.py     | 17 +++++++++++++++
 tests/test_serializers.py     | 50 +++++++++++++++++++++++++++++++++++++++++++
 tests/utils/test_config.py    | 17 +++++++++++++++
 7 files changed, 102 insertions(+), 12 deletions(-)

diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md
index d0c71b59..3684082a 100644
--- a/mkdocs/docs/configuration.md
+++ b/mkdocs/docs/configuration.md
@@ -269,3 +269,7 @@ catalog:
 # Concurrency
 
 PyIceberg uses multiple threads to parallelize operations. The number of 
workers can be configured by supplying a `max-workers` entry in the 
configuration file, or by setting the `PYICEBERG_MAX_WORKERS` environment 
variable. The default value depends on the system hardware and Python version. 
See [the Python 
documentation](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor)
 for more details.
+
+# Backward Compatibility
+
+Previous versions of Java (`<1.4.0`) implementations incorrectly assume the 
optional attribute `current-snapshot-id` to be a required attribute in 
TableMetadata. This means that if `current-snapshot-id` is missing in the 
metadata file (e.g. on table creation), the application will throw an exception 
without being able to load the table. This assumption has been corrected in 
more recent Iceberg versions. However, it is possible to force PyIceberg to 
create a table with a metadata file tha [...]
diff --git a/pyiceberg/serializers.py b/pyiceberg/serializers.py
index 6a580ead..e2994884 100644
--- a/pyiceberg/serializers.py
+++ b/pyiceberg/serializers.py
@@ -24,6 +24,7 @@ from typing import Callable
 from pyiceberg.io import InputFile, InputStream, OutputFile
 from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil
 from pyiceberg.typedef import UTF8
+from pyiceberg.utils.config import Config
 
 GZIP = "gzip"
 
@@ -127,6 +128,9 @@ class ToOutputFile:
             overwrite (bool): Where to overwrite the file if it already 
exists. Defaults to `False`.
         """
         with output_file.create(overwrite=overwrite) as output_stream:
-            json_bytes = metadata.model_dump_json().encode(UTF8)
+            # We need to serialize None values, in order to dump `None` 
current-snapshot-id as `-1`
+            exclude_none = False if 
Config().get_bool("legacy-current-snapshot-id") else True
+
+            json_bytes = 
metadata.model_dump_json(exclude_none=exclude_none).encode(UTF8)
             json_bytes = 
Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes)
             output_stream.write(json_bytes)
diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py
index d5e08cf9..b604827c 100644
--- a/pyiceberg/table/metadata.py
+++ b/pyiceberg/table/metadata.py
@@ -28,7 +28,7 @@ from typing import (
     Union,
 )
 
-from pydantic import Field, model_validator
+from pydantic import Field, field_serializer, model_validator
 from pydantic import ValidationError as PydanticValidationError
 from typing_extensions import Annotated
 
@@ -49,6 +49,7 @@ from pyiceberg.typedef import (
     IcebergRootModel,
     Properties,
 )
+from pyiceberg.utils.config import Config
 from pyiceberg.utils.datetime import datetime_to_millis
 
 CURRENT_SNAPSHOT_ID = "current-snapshot-id"
@@ -226,6 +227,12 @@ class TableMetadataCommonFields(IcebergBaseModel):
         """Get the schema by schema_id."""
         return next((schema for schema in self.schemas if schema.schema_id == 
schema_id), None)
 
+    @field_serializer('current_snapshot_id')
+    def serialize_current_snapshot_id(self, current_snapshot_id: 
Optional[int]) -> Optional[int]:
+        if current_snapshot_id is None and 
Config().get_bool("legacy-current-snapshot-id"):
+            return -1
+        return current_snapshot_id
+
 
 class TableMetadataV1(TableMetadataCommonFields, IcebergBaseModel):
     """Represents version 1 of the Table Metadata.
diff --git a/pyiceberg/utils/concurrent.py b/pyiceberg/utils/concurrent.py
index f6c0a23a..805599bf 100644
--- a/pyiceberg/utils/concurrent.py
+++ b/pyiceberg/utils/concurrent.py
@@ -37,13 +37,4 @@ class ExecutorFactory:
     @staticmethod
     def max_workers() -> Optional[int]:
         """Return the max number of workers configured."""
-        config = Config()
-        val = config.config.get("max-workers")
-
-        if val is None:
-            return None
-
-        try:
-            return int(val)  # type: ignore
-        except ValueError as err:
-            raise ValueError(f"Max workers should be an integer or left unset. 
Current value: {val}") from err
+        return Config().get_int("max-workers")
diff --git a/pyiceberg/utils/config.py b/pyiceberg/utils/config.py
index e0380054..8b1b81d3 100644
--- a/pyiceberg/utils/config.py
+++ b/pyiceberg/utils/config.py
@@ -16,6 +16,7 @@
 # under the License.
 import logging
 import os
+from distutils.util import strtobool
 from typing import List, Optional
 
 import strictyaml
@@ -154,3 +155,19 @@ class Config:
                 assert isinstance(catalog_conf, dict), f"Configuration path 
catalogs.{catalog_name_lower} needs to be an object"
                 return catalog_conf
         return None
+
+    def get_int(self, key: str) -> Optional[int]:
+        if (val := self.config.get(key)) is not None:
+            try:
+                return int(val)  # type: ignore
+            except ValueError as err:
+                raise ValueError(f"{key} should be an integer or left unset. 
Current value: {val}") from err
+        return None
+
+    def get_bool(self, key: str) -> Optional[bool]:
+        if (val := self.config.get(key)) is not None:
+            try:
+                return strtobool(val)  # type: ignore
+            except ValueError as err:
+                raise ValueError(f"{key} should be a boolean or left unset. 
Current value: {val}") from err
+        return None
diff --git a/tests/test_serializers.py b/tests/test_serializers.py
new file mode 100644
index 00000000..140db027
--- /dev/null
+++ b/tests/test_serializers.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import os
+import uuid
+from typing import Any, Dict
+
+import pytest
+from pytest_mock import MockFixture
+
+from pyiceberg.serializers import ToOutputFile
+from pyiceberg.table import StaticTable
+from pyiceberg.table.metadata import TableMetadataV1
+
+
+def test_legacy_current_snapshot_id(
+    mocker: MockFixture, tmp_path_factory: pytest.TempPathFactory, 
example_table_metadata_no_snapshot_v1: Dict[str, Any]
+) -> None:
+    from pyiceberg.io.pyarrow import PyArrowFileIO
+
+    metadata_location = str(tmp_path_factory.mktemp("metadata") / 
f"{uuid.uuid4()}.metadata.json")
+    metadata = TableMetadataV1(**example_table_metadata_no_snapshot_v1)
+    ToOutputFile.table_metadata(metadata, 
PyArrowFileIO().new_output(location=metadata_location), overwrite=True)
+    static_table = StaticTable.from_metadata(metadata_location)
+    assert static_table.metadata.current_snapshot_id is None
+
+    mocker.patch.dict(os.environ, 
values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})
+
+    ToOutputFile.table_metadata(metadata, 
PyArrowFileIO().new_output(location=metadata_location), overwrite=True)
+    with PyArrowFileIO().new_input(location=metadata_location).open() as 
input_stream:
+        metadata_json_bytes = input_stream.read()
+    assert json.loads(metadata_json_bytes)['current-snapshot-id'] == -1
+    backwards_compatible_static_table = 
StaticTable.from_metadata(metadata_location)
+    assert backwards_compatible_static_table.metadata.current_snapshot_id is 
None
+    assert backwards_compatible_static_table.metadata == static_table.metadata
diff --git a/tests/utils/test_config.py b/tests/utils/test_config.py
index 5e3f72cc..2f15bb56 100644
--- a/tests/utils/test_config.py
+++ b/tests/utils/test_config.py
@@ -76,3 +76,20 @@ def test_merge_config() -> None:
     rhs: RecursiveDict = {"common_key": "xyz789"}
     result = merge_config(lhs, rhs)
     assert result["common_key"] == rhs["common_key"]
+
+
+def test_from_configuration_files_get_typed_value(tmp_path_factory: 
pytest.TempPathFactory) -> None:
+    config_path = str(tmp_path_factory.mktemp("config"))
+    with open(f"{config_path}/.pyiceberg.yaml", "w", encoding=UTF8) as file:
+        yaml_str = as_document({"max-workers": "4", 
"legacy-current-snapshot-id": "True"}).as_yaml()
+        file.write(yaml_str)
+
+    os.environ["PYICEBERG_HOME"] = config_path
+    with pytest.raises(ValueError):
+        Config().get_bool("max-workers")
+
+    with pytest.raises(ValueError):
+        Config().get_int("legacy-current-snapshot-id")
+
+    assert Config().get_bool("legacy-current-snapshot-id")
+    assert Config().get_int("max-workers") == 4

Reply via email to