This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 2acba740 Use `self.table_metadata` in transaction (#985)
2acba740 is described below
commit 2acba7409c1147fda1a59a73b0f1d075b9e8a580
Author: Honah J. <[email protected]>
AuthorDate: Wed Jul 31 03:26:06 2024 -0700
Use `self.table_metadata` in transaction (#985)
---
pyiceberg/table/__init__.py | 30 +++++++++++++++-------------
tests/catalog/test_sql.py | 6 +++---
tests/integration/test_writes/test_writes.py | 6 +++---
3 files changed, 22 insertions(+), 20 deletions(-)
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 5188152a..a7a2dec2 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -331,10 +331,10 @@ class Transaction:
if format_version not in {1, 2}:
raise ValueError(f"Unsupported table format version:
{format_version}")
- if format_version < self._table.metadata.format_version:
- raise ValueError(f"Cannot downgrade
v{self._table.metadata.format_version} table to v{format_version}")
+ if format_version < self.table_metadata.format_version:
+ raise ValueError(f"Cannot downgrade
v{self.table_metadata.format_version} table to v{format_version}")
- if format_version > self._table.metadata.format_version:
+ if format_version > self.table_metadata.format_version:
return
self._apply((UpgradeFormatVersionUpdate(format_version=format_version),))
return self
@@ -452,7 +452,7 @@ class Transaction:
self,
allow_incompatible_changes=allow_incompatible_changes,
case_sensitive=case_sensitive,
- name_mapping=self._table.name_mapping(),
+ name_mapping=self.table_metadata.name_mapping(),
)
def update_snapshot(self, snapshot_properties: Dict[str, str] =
EMPTY_DICT) -> UpdateSnapshot:
@@ -489,7 +489,7 @@ class Transaction:
)
downcast_ns_timestamp_to_us =
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
_check_pyarrow_schema_compatible(
- self._table.schema(), provided_schema=df.schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
+ self.table_metadata.schema(), provided_schema=df.schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
)
manifest_merge_enabled = PropertyUtil.property_as_bool(
@@ -504,7 +504,7 @@ class Transaction:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(
- table_metadata=self._table.metadata,
write_uuid=append_files.commit_uuid, df=df, io=self._table.io
+ table_metadata=self.table_metadata,
write_uuid=append_files.commit_uuid, df=df, io=self._table.io
)
for data_file in data_files:
append_files.append_data_file(data_file)
@@ -548,7 +548,7 @@ class Transaction:
)
downcast_ns_timestamp_to_us =
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
_check_pyarrow_schema_compatible(
- self._table.schema(), provided_schema=df.schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
+ self.table_metadata.schema(), provided_schema=df.schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
)
self.delete(delete_filter=overwrite_filter,
snapshot_properties=snapshot_properties)
@@ -557,7 +557,7 @@ class Transaction:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(
- table_metadata=self._table.metadata,
write_uuid=update_snapshot.commit_uuid, df=df, io=self._table.io
+ table_metadata=self.table_metadata,
write_uuid=update_snapshot.commit_uuid, df=df, io=self._table.io
)
for data_file in data_files:
update_snapshot.append_data_file(data_file)
@@ -595,7 +595,7 @@ class Transaction:
# Check if there are any files that require an actual rewrite of a
data file
if delete_snapshot.rewrites_needed is True:
- bound_delete_filter = bind(self._table.schema(), delete_filter,
case_sensitive=True)
+ bound_delete_filter = bind(self.table_metadata.schema(),
delete_filter, case_sensitive=True)
preserve_row_filter =
_expression_to_complementary_pyarrow(bound_delete_filter)
files = self._scan(row_filter=delete_filter).plan_files()
@@ -614,7 +614,7 @@ class Transaction:
for original_file in files:
df = project_table(
tasks=[original_file],
- table_metadata=self._table.metadata,
+ table_metadata=self.table_metadata,
io=self._table.io,
row_filter=AlwaysTrue(),
projected_schema=self.table_metadata.schema(),
@@ -629,7 +629,7 @@ class Transaction:
_dataframe_to_data_files(
io=self._table.io,
df=filtered_df,
- table_metadata=self._table.metadata,
+ table_metadata=self.table_metadata,
write_uuid=commit_uuid,
counter=counter,
)
@@ -658,11 +658,13 @@ class Transaction:
Raises:
FileNotFoundError: If the file does not exist.
"""
- if self._table.name_mapping() is None:
- self.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING:
self._table.schema().name_mapping.model_dump_json()})
+ if self.table_metadata.name_mapping() is None:
+ self.set_properties(**{
+ TableProperties.DEFAULT_NAME_MAPPING:
self.table_metadata.schema().name_mapping.model_dump_json()
+ })
with
self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as
update_snapshot:
data_files = _parquet_files_to_data_files(
- table_metadata=self._table.metadata, file_paths=file_paths,
io=self._table.io
+ table_metadata=self.table_metadata, file_paths=file_paths,
io=self._table.io
)
for data_file in data_files:
update_snapshot.append_data_file(data_file)
diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py
index 8046ac7a..2bee387a 100644
--- a/tests/catalog/test_sql.py
+++ b/tests/catalog/test_sql.py
@@ -1421,9 +1421,9 @@ def test_write_and_evolve(catalog: SqlCatalog,
format_version: int) -> None:
with txn.update_schema() as schema_txn:
schema_txn.union_by_name(pa_table_with_column.schema)
- with txn.update_snapshot().fast_append() as snapshot_update:
- for data_file in
_dataframe_to_data_files(table_metadata=txn.table_metadata,
df=pa_table_with_column, io=tbl.io):
- snapshot_update.append_data_file(data_file)
+ txn.append(pa_table_with_column)
+ txn.overwrite(pa_table_with_column)
+ txn.delete("foo = 'a'")
@pytest.mark.parametrize(
diff --git a/tests/integration/test_writes/test_writes.py
b/tests/integration/test_writes/test_writes.py
index 93999d96..8ea51f4b 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -718,9 +718,9 @@ def test_write_and_evolve(session_catalog: Catalog,
format_version: int) -> None
with txn.update_schema() as schema_txn:
schema_txn.union_by_name(pa_table_with_column.schema)
- with txn.update_snapshot().fast_append() as snapshot_update:
- for data_file in
_dataframe_to_data_files(table_metadata=txn.table_metadata,
df=pa_table_with_column, io=tbl.io):
- snapshot_update.append_data_file(data_file)
+ txn.append(pa_table_with_column)
+ txn.overwrite(pa_table_with_column)
+ txn.delete("foo = 'a'")
@pytest.mark.integration