This is an automated email from the ASF dual-hosted git repository. fokko pushed a commit to branch pyiceberg-0.9.x in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
commit 562de407d0dbf5e1212f0a541001a12543a645f3 Author: Fokko Driesprong <[email protected]> AuthorDate: Wed Apr 16 20:35:36 2025 +0200 Fix `add_files` with non-identity transforms (#1925) <!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> Found out I broke this myself after doing a `git bisect`: ``` 36d383dcb676ae5ef59c34cc2910d16a8e30a80c is the first bad commit commit 36d383dcb676ae5ef59c34cc2910d16a8e30a80c Author: Fokko Driesprong <[email protected]> Date: Thu Jan 23 07:50:54 2025 +0100 PyArrow: Avoid buffer-overflow by avoid doing a sort (#1555) Second attempt of https://github.com/apache/iceberg-python/pull/1539 This was already being discussed back here: https://github.com/apache/iceberg-python/issues/208#issuecomment-1889891973 This PR changes from doing a sort, and then a single pass over the table to the approach where we determine the unique partition tuples filter on them individually. Fixes https://github.com/apache/iceberg-python/issues/1491 Because the sort caused buffers to be joined where it would overflow in Arrow. I think this is an issue on the Arrow side, and it should automatically break up into smaller buffers. The `combine_chunks` method does this correctly. Now: ``` 0.42877754200890195 Run 1 took: 0.2507691659993725 Run 2 took: 0.24833179199777078 Run 3 took: 0.24401691700040828 Run 4 took: 0.2419595829996979 Average runtime of 0.28 seconds ``` Before: ``` Run 0 took: 1.0768639159941813 Run 1 took: 0.8784021250030492 Run 2 took: 0.8486490420036716 Run 3 took: 0.8614017910003895 Run 4 took: 0.8497851670108503 Average runtime of 0.9 seconds ``` So it comes with a nice speedup as well :) --------- Co-authored-by: Kevin Liu <[email protected]> pyiceberg/io/pyarrow.py | 129 ++- pyiceberg/partitioning.py | 39 +- pyiceberg/table/__init__.py | 6 +- pyproject.toml | 1 + tests/benchmark/test_benchmark.py | 72 ++ tests/integration/test_partitioning_key.py | 1299 ++++++++++++++-------------- tests/table/test_locations.py | 2 +- 7 files changed, 805 insertions(+), 743 deletions(-) create mode 100644 tests/benchmark/test_benchmark.py ``` Closes #1917 <!-- In the case of user-facing changes, please add the changelog label. --> --- pyiceberg/io/pyarrow.py | 31 +++++++++++++++++++------------ tests/integration/test_add_files.py | 33 +++++++++++++++++++++++++++++++-- 2 files changed, 50 insertions(+), 14 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 90309546..cf7bcc10 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2205,29 +2205,36 @@ class DataFileStatistics: if partition_field.source_id not in self.column_aggregates: return None - if not partition_field.transform.preserves_order: + source_field = schema.find_field(partition_field.source_id) + iceberg_transform = partition_field.transform + + if not iceberg_transform.preserves_order: raise ValueError( f"Cannot infer partition value from parquet metadata for a non-linear Partition Field: {partition_field.name} with transform {partition_field.transform}" ) - lower_value = partition_record_value( - partition_field=partition_field, - value=self.column_aggregates[partition_field.source_id].current_min, - schema=schema, + transform_func = iceberg_transform.transform(source_field.field_type) + + lower_value = transform_func( + partition_record_value( + partition_field=partition_field, + value=self.column_aggregates[partition_field.source_id].current_min, + schema=schema, + ) ) - upper_value = partition_record_value( - partition_field=partition_field, - value=self.column_aggregates[partition_field.source_id].current_max, - schema=schema, + upper_value = transform_func( + partition_record_value( + partition_field=partition_field, + value=self.column_aggregates[partition_field.source_id].current_max, + schema=schema, + ) ) if lower_value != upper_value: raise ValueError( f"Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: {partition_field.name}. {lower_value=}, {upper_value=}" ) - source_field = schema.find_field(partition_field.source_id) - transform = partition_field.transform.transform(source_field.field_type) - return transform(lower_value) + return lower_value def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record: return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields}) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 87136152..a1480375 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -30,11 +30,12 @@ from pytest_mock.plugin import MockerFixture from pyiceberg.catalog import Catalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.io import FileIO -from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _pyarrow_schema_ensure_large_types +from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, schema_to_pyarrow +from pyiceberg.io.pyarrow import _pyarrow_schema_ensure_large_types from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table -from pyiceberg.transforms import BucketTransform, IdentityTransform, MonthTransform +from pyiceberg.transforms import BucketTransform, HourTransform, IdentityTransform, MonthTransform from pyiceberg.types import ( BooleanType, DateType, @@ -42,6 +43,7 @@ from pyiceberg.types import ( LongType, NestedField, StringType, + TimestampType, TimestamptzType, ) @@ -850,3 +852,30 @@ def test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_file with pytest.raises(ValueError) as exc_info: tbl.add_files(file_paths=[existing_files_in_table], check_duplicate_files=True) assert f"Cannot add files that are already referenced by table, files: {existing_files_in_table}" in str(exc_info.value) + + [email protected] +def test_add_files_hour_transform(session_catalog: Catalog) -> None: + identifier = "default.test_add_files_hour_transform" + + schema = Schema(NestedField(1, "hourly", TimestampType())) + schema_arrow = schema_to_pyarrow(schema, include_field_ids=False) + spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=HourTransform(), name="spec_hour")) + + tbl = _create_table(session_catalog, identifier, format_version=1, schema=schema, partition_spec=spec) + + file_path = "s3://warehouse/default/test_add_files_hour_transform/test.parquet" + + from pyiceberg.utils.datetime import micros_to_timestamp + + arrow_table = pa.Table.from_pylist( + [{"hourly": micros_to_timestamp(1743465600155254)}, {"hourly": micros_to_timestamp(1743469198047855)}], + schema=schema_arrow, + ) + + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=schema_arrow) as writer: + writer.write_table(arrow_table) + + tbl.add_files(file_paths=[file_path])
