This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch pyiceberg-0.9.x
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git

commit 562de407d0dbf5e1212f0a541001a12543a645f3
Author: Fokko Driesprong <[email protected]>
AuthorDate: Wed Apr 16 20:35:36 2025 +0200

    Fix `add_files` with non-identity transforms (#1925)
    
    <!--
    Thanks for opening a pull request!
    -->
    
    <!-- In the case this PR will resolve an issue, please replace
    ${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
    <!-- Closes #${GITHUB_ISSUE_ID} -->
    
    Found out I broke this myself after doing a `git bisect`:
    
    ```
    36d383dcb676ae5ef59c34cc2910d16a8e30a80c is the first bad commit
    commit 36d383dcb676ae5ef59c34cc2910d16a8e30a80c
    Author: Fokko Driesprong <[email protected]>
    Date:   Thu Jan 23 07:50:54 2025 +0100
    
        PyArrow: Avoid buffer-overflow by avoid doing a sort (#1555)
    
        Second attempt of https://github.com/apache/iceberg-python/pull/1539
    
        This was already being discussed back here:
        
https://github.com/apache/iceberg-python/issues/208#issuecomment-1889891973
    
        This PR changes from doing a sort, and then a single pass over the table
        to the approach where we determine the unique partition tuples filter on
        them individually.
    
        Fixes https://github.com/apache/iceberg-python/issues/1491
    
        Because the sort caused buffers to be joined where it would overflow in
        Arrow. I think this is an issue on the Arrow side, and it should
        automatically break up into smaller buffers. The `combine_chunks` method
        does this correctly.
    
        Now:
    
        ```
        0.42877754200890195
        Run 1 took: 0.2507691659993725
        Run 2 took: 0.24833179199777078
        Run 3 took: 0.24401691700040828
        Run 4 took: 0.2419595829996979
        Average runtime of 0.28 seconds
        ```
    
        Before:
    
        ```
        Run 0 took: 1.0768639159941813
        Run 1 took: 0.8784021250030492
        Run 2 took: 0.8486490420036716
        Run 3 took: 0.8614017910003895
        Run 4 took: 0.8497851670108503
        Average runtime of 0.9 seconds
        ```
    
        So it comes with a nice speedup as well :)
    
        ---------
    
        Co-authored-by: Kevin Liu <[email protected]>
    
     pyiceberg/io/pyarrow.py                    |  129 ++-
     pyiceberg/partitioning.py                  |   39 +-
     pyiceberg/table/__init__.py                |    6 +-
     pyproject.toml                             |    1 +
     tests/benchmark/test_benchmark.py          |   72 ++
     tests/integration/test_partitioning_key.py | 1299 
++++++++++++++--------------
     tests/table/test_locations.py              |    2 +-
     7 files changed, 805 insertions(+), 743 deletions(-)
     create mode 100644 tests/benchmark/test_benchmark.py
    ```
    
    Closes #1917
    
    <!-- In the case of user-facing changes, please add the changelog label.
    -->
---
 pyiceberg/io/pyarrow.py             | 31 +++++++++++++++++++------------
 tests/integration/test_add_files.py | 33 +++++++++++++++++++++++++++++++--
 2 files changed, 50 insertions(+), 14 deletions(-)

diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 90309546..cf7bcc10 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -2205,29 +2205,36 @@ class DataFileStatistics:
         if partition_field.source_id not in self.column_aggregates:
             return None
 
-        if not partition_field.transform.preserves_order:
+        source_field = schema.find_field(partition_field.source_id)
+        iceberg_transform = partition_field.transform
+
+        if not iceberg_transform.preserves_order:
             raise ValueError(
                 f"Cannot infer partition value from parquet metadata for a 
non-linear Partition Field: {partition_field.name} with transform 
{partition_field.transform}"
             )
 
-        lower_value = partition_record_value(
-            partition_field=partition_field,
-            
value=self.column_aggregates[partition_field.source_id].current_min,
-            schema=schema,
+        transform_func = iceberg_transform.transform(source_field.field_type)
+
+        lower_value = transform_func(
+            partition_record_value(
+                partition_field=partition_field,
+                
value=self.column_aggregates[partition_field.source_id].current_min,
+                schema=schema,
+            )
         )
-        upper_value = partition_record_value(
-            partition_field=partition_field,
-            
value=self.column_aggregates[partition_field.source_id].current_max,
-            schema=schema,
+        upper_value = transform_func(
+            partition_record_value(
+                partition_field=partition_field,
+                
value=self.column_aggregates[partition_field.source_id].current_max,
+                schema=schema,
+            )
         )
         if lower_value != upper_value:
             raise ValueError(
                 f"Cannot infer partition value from parquet metadata as there 
are more than one partition values for Partition Field: {partition_field.name}. 
{lower_value=}, {upper_value=}"
             )
 
-        source_field = schema.find_field(partition_field.source_id)
-        transform = 
partition_field.transform.transform(source_field.field_type)
-        return transform(lower_value)
+        return lower_value
 
     def partition(self, partition_spec: PartitionSpec, schema: Schema) -> 
Record:
         return Record(**{field.name: self._partition_value(field, schema) for 
field in partition_spec.fields})
diff --git a/tests/integration/test_add_files.py 
b/tests/integration/test_add_files.py
index 87136152..a1480375 100644
--- a/tests/integration/test_add_files.py
+++ b/tests/integration/test_add_files.py
@@ -30,11 +30,12 @@ from pytest_mock.plugin import MockerFixture
 from pyiceberg.catalog import Catalog
 from pyiceberg.exceptions import NoSuchTableError
 from pyiceberg.io import FileIO
-from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, 
_pyarrow_schema_ensure_large_types
+from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, 
schema_to_pyarrow
+from pyiceberg.io.pyarrow import _pyarrow_schema_ensure_large_types
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, 
PartitionField, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.table import Table
-from pyiceberg.transforms import BucketTransform, IdentityTransform, 
MonthTransform
+from pyiceberg.transforms import BucketTransform, HourTransform, 
IdentityTransform, MonthTransform
 from pyiceberg.types import (
     BooleanType,
     DateType,
@@ -42,6 +43,7 @@ from pyiceberg.types import (
     LongType,
     NestedField,
     StringType,
+    TimestampType,
     TimestamptzType,
 )
 
@@ -850,3 +852,30 @@ def 
test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_file
     with pytest.raises(ValueError) as exc_info:
         tbl.add_files(file_paths=[existing_files_in_table], 
check_duplicate_files=True)
     assert f"Cannot add files that are already referenced by table, files: 
{existing_files_in_table}" in str(exc_info.value)
+
+
[email protected]
+def test_add_files_hour_transform(session_catalog: Catalog) -> None:
+    identifier = "default.test_add_files_hour_transform"
+
+    schema = Schema(NestedField(1, "hourly", TimestampType()))
+    schema_arrow = schema_to_pyarrow(schema, include_field_ids=False)
+    spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, 
transform=HourTransform(), name="spec_hour"))
+
+    tbl = _create_table(session_catalog, identifier, format_version=1, 
schema=schema, partition_spec=spec)
+
+    file_path = 
"s3://warehouse/default/test_add_files_hour_transform/test.parquet"
+
+    from pyiceberg.utils.datetime import micros_to_timestamp
+
+    arrow_table = pa.Table.from_pylist(
+        [{"hourly": micros_to_timestamp(1743465600155254)}, {"hourly": 
micros_to_timestamp(1743469198047855)}],
+        schema=schema_arrow,
+    )
+
+    fo = tbl.io.new_output(file_path)
+    with fo.create(overwrite=True) as fos:
+        with pq.ParquetWriter(fos, schema=schema_arrow) as writer:
+            writer.write_table(arrow_table)
+
+    tbl.add_files(file_paths=[file_path])

Reply via email to