This is an automated email from the ASF dual-hosted git repository. sungwy pushed a commit to branch pyiceberg-0.7.x in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
commit a057849ab64a74200cfb6bfed328461f71d09262 Author: Honah J. <[email protected]> AuthorDate: Wed Jul 31 03:26:06 2024 -0700 Use `self.table_metadata` in transaction (#985) --- pyiceberg/table/__init__.py | 30 +++++++++++++++------------- tests/catalog/test_sql.py | 6 +++--- tests/integration/test_writes/test_writes.py | 6 +++--- 3 files changed, 22 insertions(+), 20 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 5188152a..a7a2dec2 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -331,10 +331,10 @@ class Transaction: if format_version not in {1, 2}: raise ValueError(f"Unsupported table format version: {format_version}") - if format_version < self._table.metadata.format_version: - raise ValueError(f"Cannot downgrade v{self._table.metadata.format_version} table to v{format_version}") + if format_version < self.table_metadata.format_version: + raise ValueError(f"Cannot downgrade v{self.table_metadata.format_version} table to v{format_version}") - if format_version > self._table.metadata.format_version: + if format_version > self.table_metadata.format_version: return self._apply((UpgradeFormatVersionUpdate(format_version=format_version),)) return self @@ -452,7 +452,7 @@ class Transaction: self, allow_incompatible_changes=allow_incompatible_changes, case_sensitive=case_sensitive, - name_mapping=self._table.name_mapping(), + name_mapping=self.table_metadata.name_mapping(), ) def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot: @@ -489,7 +489,7 @@ class Transaction: ) downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( - self._table.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us ) manifest_merge_enabled = PropertyUtil.property_as_bool( @@ -504,7 +504,7 @@ class Transaction: # skip writing data files if the dataframe is empty if df.shape[0] > 0: data_files = _dataframe_to_data_files( - table_metadata=self._table.metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io + table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io ) for data_file in data_files: append_files.append_data_file(data_file) @@ -548,7 +548,7 @@ class Transaction: ) downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( - self._table.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us ) self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties) @@ -557,7 +557,7 @@ class Transaction: # skip writing data files if the dataframe is empty if df.shape[0] > 0: data_files = _dataframe_to_data_files( - table_metadata=self._table.metadata, write_uuid=update_snapshot.commit_uuid, df=df, io=self._table.io + table_metadata=self.table_metadata, write_uuid=update_snapshot.commit_uuid, df=df, io=self._table.io ) for data_file in data_files: update_snapshot.append_data_file(data_file) @@ -595,7 +595,7 @@ class Transaction: # Check if there are any files that require an actual rewrite of a data file if delete_snapshot.rewrites_needed is True: - bound_delete_filter = bind(self._table.schema(), delete_filter, case_sensitive=True) + bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive=True) preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter) files = self._scan(row_filter=delete_filter).plan_files() @@ -614,7 +614,7 @@ class Transaction: for original_file in files: df = project_table( tasks=[original_file], - table_metadata=self._table.metadata, + table_metadata=self.table_metadata, io=self._table.io, row_filter=AlwaysTrue(), projected_schema=self.table_metadata.schema(), @@ -629,7 +629,7 @@ class Transaction: _dataframe_to_data_files( io=self._table.io, df=filtered_df, - table_metadata=self._table.metadata, + table_metadata=self.table_metadata, write_uuid=commit_uuid, counter=counter, ) @@ -658,11 +658,13 @@ class Transaction: Raises: FileNotFoundError: If the file does not exist. """ - if self._table.name_mapping() is None: - self.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self._table.schema().name_mapping.model_dump_json()}) + if self.table_metadata.name_mapping() is None: + self.set_properties(**{ + TableProperties.DEFAULT_NAME_MAPPING: self.table_metadata.schema().name_mapping.model_dump_json() + }) with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot: data_files = _parquet_files_to_data_files( - table_metadata=self._table.metadata, file_paths=file_paths, io=self._table.io + table_metadata=self.table_metadata, file_paths=file_paths, io=self._table.io ) for data_file in data_files: update_snapshot.append_data_file(data_file) diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index 8046ac7a..2bee387a 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -1421,9 +1421,9 @@ def test_write_and_evolve(catalog: SqlCatalog, format_version: int) -> None: with txn.update_schema() as schema_txn: schema_txn.union_by_name(pa_table_with_column.schema) - with txn.update_snapshot().fast_append() as snapshot_update: - for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=pa_table_with_column, io=tbl.io): - snapshot_update.append_data_file(data_file) + txn.append(pa_table_with_column) + txn.overwrite(pa_table_with_column) + txn.delete("foo = 'a'") @pytest.mark.parametrize( diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 93999d96..8ea51f4b 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -718,9 +718,9 @@ def test_write_and_evolve(session_catalog: Catalog, format_version: int) -> None with txn.update_schema() as schema_txn: schema_txn.union_by_name(pa_table_with_column.schema) - with txn.update_snapshot().fast_append() as snapshot_update: - for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=pa_table_with_column, io=tbl.io): - snapshot_update.append_data_file(data_file) + txn.append(pa_table_with_column) + txn.overwrite(pa_table_with_column) + txn.delete("foo = 'a'") @pytest.mark.integration
