This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 3f574d38 Support partial deletes (#569)
3f574d38 is described below
commit 3f574d389b4b5cd17654638a40963eacf65563f1
Author: Fokko Driesprong <[email protected]>
AuthorDate: Tue Jul 9 11:36:43 2024 +0200
Support partial deletes (#569)
* Add option to delete datafiles
This is done through the Iceberg metadata, resulting
in efficient deletes if the data is partitioned correctly
* Pull in main
* WIP
* Change DataScan to accept Metadata and io
For the partial deletes I want to do a scan on in
memory metadata. Changing this API allows this.
* fix name-mapping issue
* WIP
* WIP
* Moar tests
* Oops
* Cleanup
* WIP
* WIP
* Fix summary generation
* Last few bits
* Fix the requirement
* Make ruff happy
* Comments, thanks Kevin!
* Comments
* Append rather than truncate
* Fix merge conflicts
* Make the tests pass
* Add another test
* Conflicts
* Add docs (#33)
* docs
* docs
* Add a partitioned overwrite test
* Fix comment
* Skip empty manifests
---------
Co-authored-by: HonahX <[email protected]>
Co-authored-by: Sung Yun <[email protected]>
---
mkdocs/docs/api.md | 21 +-
pyiceberg/io/pyarrow.py | 64 ++-
pyiceberg/manifest.py | 2 +-
pyiceberg/table/__init__.py | 451 +++++++++++++++++----
pyiceberg/table/snapshots.py | 2 +-
tests/catalog/integration_test_glue.py | 3 +-
tests/catalog/test_sql.py | 3 +-
tests/conftest.py | 32 +-
tests/integration/test_deletes.py | 419 +++++++++++++++++++
tests/integration/test_inspect_table.py | 18 +-
tests/integration/test_rest_schema.py | 18 +-
.../test_writes/test_partitioned_writes.py | 20 +-
tests/integration/test_writes/test_writes.py | 107 +++--
tests/table/test_snapshots.py | 4 -
14 files changed, 1025 insertions(+), 139 deletions(-)
diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 0e80b6eb..7386d029 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -331,12 +331,25 @@ df = pa.Table.from_pylist(
table.append(df)
```
-<!-- prettier-ignore-start -->
+You can delete some of the data from the table by calling `tbl.delete()` with
a desired `delete_filter`.
+
+```python
+tbl.delete(delete_filter="city == 'Paris'")
+```
-!!! example "Under development"
- Writing using PyIceberg is still under development. Support for [partial
overwrites](https://github.com/apache/iceberg-python/issues/268) and writing to
[partitioned tables](https://github.com/apache/iceberg-python/issues/208) is
planned and being worked on.
+In the above example, any records where the city field value equals to `Paris`
will be deleted.
+Running `tbl.scan().to_arrow()` will now yield:
-<!-- prettier-ignore-end -->
+```
+pyarrow.Table
+city: string
+lat: double
+long: double
+----
+city: [["Amsterdam","San Francisco","Drachten"],["Groningen"]]
+lat: [[52.371807,37.773972,53.11254],[53.21917]]
+long: [[4.896029,-122.431297,6.0989],[6.56667]]
+```
## Inspecting tables
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index e6490ae1..50406972 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -31,6 +31,7 @@ import itertools
import logging
import os
import re
+import uuid
from abc import ABC, abstractmethod
from concurrent.futures import Future
from copy import copy
@@ -126,7 +127,6 @@ from pyiceberg.schema import (
visit,
visit_with_partner,
)
-from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.transforms import TruncateTransform
@@ -159,7 +159,7 @@ from pyiceberg.utils.singleton import Singleton
from pyiceberg.utils.truncate import truncate_upper_bound_binary_string,
truncate_upper_bound_text_string
if TYPE_CHECKING:
- from pyiceberg.table import FileScanTask
+ from pyiceberg.table import FileScanTask, WriteTask
logger = logging.getLogger(__name__)
@@ -1563,6 +1563,8 @@ class
PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]
_default_mode: str
def __init__(self, schema: Schema, properties: Dict[str, str]):
+ from pyiceberg.table import TableProperties
+
self._schema = schema
self._properties = properties
self._default_mode = self._properties.get(
@@ -1598,6 +1600,8 @@ class
PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]
return k + v
def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]:
+ from pyiceberg.table import TableProperties
+
column_name = self._schema.find_column_name(self._field_id)
if column_name is None:
return []
@@ -1895,6 +1899,8 @@ def data_file_statistics_from_parquet_metadata(
def write_file(io: FileIO, table_metadata: TableMetadata, tasks:
Iterator[WriteTask]) -> Iterator[DataFile]:
+ from pyiceberg.table import PropertyUtil, TableProperties
+
parquet_writer_kwargs =
_get_parquet_writer_kwargs(table_metadata.properties)
row_group_size = PropertyUtil.property_as_int(
properties=table_metadata.properties,
@@ -2005,6 +2011,8 @@ PYARROW_UNCOMPRESSED_CODEC = "none"
def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
+ from pyiceberg.table import PropertyUtil, TableProperties
+
for key_pattern in [
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
TableProperties.PARQUET_PAGE_ROW_LIMIT,
@@ -2042,3 +2050,55 @@ def _get_parquet_writer_kwargs(table_properties:
Properties) -> Dict[str, Any]:
default=TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT,
),
}
+
+
+def _dataframe_to_data_files(
+ table_metadata: TableMetadata,
+ df: pa.Table,
+ io: FileIO,
+ write_uuid: Optional[uuid.UUID] = None,
+ counter: Optional[itertools.count[int]] = None,
+) -> Iterable[DataFile]:
+ """Convert a PyArrow table into a DataFile.
+
+ Returns:
+ An iterable that supplies datafiles that represent the table.
+ """
+ from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
+
+ counter = counter or itertools.count(0)
+ write_uuid = write_uuid or uuid.uuid4()
+ target_file_size: int = PropertyUtil.property_as_int( # type: ignore #
The property is set with non-None value.
+ properties=table_metadata.properties,
+ property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+ default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
+ )
+
+ if table_metadata.spec().is_unpartitioned():
+ yield from write_file(
+ io=io,
+ table_metadata=table_metadata,
+ tasks=iter([
+ WriteTask(write_uuid=write_uuid, task_id=next(counter),
record_batches=batches, schema=table_metadata.schema())
+ for batches in bin_pack_arrow_table(df, target_file_size)
+ ]),
+ )
+ else:
+ from pyiceberg.table import _determine_partitions
+
+ partitions = _determine_partitions(spec=table_metadata.spec(),
schema=table_metadata.schema(), arrow_table=df)
+ yield from write_file(
+ io=io,
+ table_metadata=table_metadata,
+ tasks=iter([
+ WriteTask(
+ write_uuid=write_uuid,
+ task_id=next(counter),
+ record_batches=batches,
+ partition_key=partition.partition_key,
+ schema=table_metadata.schema(),
+ )
+ for partition in partitions
+ for batches in
bin_pack_arrow_table(partition.arrow_table_partition, target_file_size)
+ ]),
+ )
diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py
index 4fd82fec..e6a81d2a 100644
--- a/pyiceberg/manifest.py
+++ b/pyiceberg/manifest.py
@@ -341,7 +341,7 @@ class DataFile(Record):
split_offsets: Optional[List[int]]
equality_ids: Optional[List[int]]
sort_order_id: Optional[int]
- spec_id: Optional[int]
+ spec_id: int
def __setattr__(self, name: str, value: Any) -> None:
"""Assign a key/value to a DataFile."""
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 2eec4d30..8eea9859 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -50,19 +50,27 @@ import pyiceberg.expressions.parser as parser
from pyiceberg.conversions import from_bytes
from pyiceberg.exceptions import CommitFailedException, ResolveError,
ValidationError
from pyiceberg.expressions import (
+ AlwaysFalse,
AlwaysTrue,
And,
BooleanExpression,
EqualTo,
+ Not,
+ Or,
Reference,
)
from pyiceberg.expressions.visitors import (
+ ROWS_CANNOT_MATCH,
+ ROWS_MUST_MATCH,
_InclusiveMetricsEvaluator,
+ _StrictMetricsEvaluator,
+ bind,
expression_evaluator,
inclusive_projection,
manifest_evaluator,
)
from pyiceberg.io import FileIO, load_file_io
+from pyiceberg.io.pyarrow import _dataframe_to_data_files,
expression_to_pyarrow, project_table
from pyiceberg.manifest import (
POSITIONAL_DELETE_SCHEMA,
DataFile,
@@ -238,6 +246,11 @@ class TableProperties:
WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit"
WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0
+ DELETE_MODE = "write.delete.mode"
+ DELETE_MODE_COPY_ON_WRITE = "copy-on-write"
+ DELETE_MODE_MERGE_ON_READ = "merge-on-read"
+ DELETE_MODE_DEFAULT = DELETE_MODE_COPY_ON_WRITE
+
DEFAULT_NAME_MAPPING = "schema.name-mapping.default"
FORMAT_VERSION = "format-version"
DEFAULT_FORMAT_VERSION = 2
@@ -305,7 +318,13 @@ class Transaction:
requirement.validate(self.table_metadata)
self._updates += updates
- self._requirements += requirements
+
+ # For the requirements, it does not make sense to add a requirement
more than once
+ # For example, you cannot assert that the current schema has two
different IDs
+ existing_requirements = {type(requirement) for requirement in
self._requirements}
+ for new_requirement in requirements:
+ if type(new_requirement) not in existing_requirements:
+ self._requirements = self._requirements + requirements
self.table_metadata = update_table_metadata(self.table_metadata,
updates)
@@ -316,6 +335,14 @@ class Transaction:
return self
+ def _scan(self, row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE)
-> DataScan:
+ """Minimal data scan the table with the current state of the
transaction."""
+ return DataScan(
+ table_metadata=self.table_metadata,
+ io=self._table.io,
+ row_filter=row_filter,
+ )
+
def upgrade_table_version(self, format_version: TableVersion) ->
Transaction:
"""Set the table to a certain version.
@@ -499,11 +526,20 @@ class Transaction:
update_snapshot.append_data_file(data_file)
def overwrite(
- self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE,
snapshot_properties: Dict[str, str] = EMPTY_DICT
+ self,
+ df: pa.Table,
+ overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
+ snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
"""
Shorthand for adding a table overwrite with a PyArrow table to the
transaction.
+ An overwrite may produce zero or more snapshots based on the operation:
+
+ - DELETE: In case existing Parquet files can be dropped completely.
+ - REPLACE: In case existing Parquet files need to be rewritten.
+ - APPEND: In case new data is being inserted into the table.
+
Args:
df: The Arrow dataframe that will be used to overwrite the table
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
@@ -518,11 +554,12 @@ class Transaction:
if not isinstance(df, pa.Table):
raise ValueError(f"Expected PyArrow table, got: {df}")
- if overwrite_filter != AlwaysTrue():
- raise NotImplementedError("Cannot overwrite a subset of a table")
-
- if len(self._table.spec().fields) > 0:
- raise ValueError("Cannot write to partitioned tables")
+ if unsupported_partitions := [
+ field for field in self.table_metadata.spec().fields if not
field.transform.supports_pyarrow_transform
+ ]:
+ raise ValueError(
+ f"Not all partition types are supported for writes. Following
partitions cannot be written using pyarrow: {unsupported_partitions}."
+ )
_check_schema_compatible(self._table.schema(), other_schema=df.schema)
# cast if the two schemas are compatible but not equal
@@ -530,7 +567,9 @@ class Transaction:
if table_arrow_schema != df.schema:
df = df.cast(table_arrow_schema)
- with
self.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as
update_snapshot:
+ self.delete(delete_filter=overwrite_filter,
snapshot_properties=snapshot_properties)
+
+ with
self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as
update_snapshot:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = _dataframe_to_data_files(
@@ -539,6 +578,86 @@ class Transaction:
for data_file in data_files:
update_snapshot.append_data_file(data_file)
+ def delete(self, delete_filter: Union[str, BooleanExpression],
snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
+ """
+ Shorthand for deleting record from a table.
+
+ An deletee may produce zero or more snapshots based on the operation:
+
+ - DELETE: In case existing Parquet files can be dropped completely.
+ - REPLACE: In case existing Parquet files need to be rewritten
+
+ Args:
+ delete_filter: A boolean expression to delete rows from a table
+ snapshot_properties: Custom properties to be added to the snapshot
summary
+ """
+ if (
+ self.table_metadata.properties.get(TableProperties.DELETE_MODE,
TableProperties.DELETE_MODE_DEFAULT)
+ == TableProperties.DELETE_MODE_MERGE_ON_READ
+ ):
+ warnings.warn("Merge on read is not yet supported, falling back to
copy-on-write")
+
+ if isinstance(delete_filter, str):
+ delete_filter = _parse_row_filter(delete_filter)
+
+ with
self.update_snapshot(snapshot_properties=snapshot_properties).delete() as
delete_snapshot:
+ delete_snapshot.delete_by_predicate(delete_filter)
+
+ # Check if there are any files that require an actual rewrite of a
data file
+ if delete_snapshot.rewrites_needed is True:
+ bound_delete_filter = bind(self._table.schema(), delete_filter,
case_sensitive=True)
+ preserve_row_filter =
expression_to_pyarrow(Not(bound_delete_filter))
+
+ files = self._scan(row_filter=delete_filter).plan_files()
+
+ commit_uuid = uuid.uuid4()
+ counter = itertools.count(0)
+
+ replaced_files: List[Tuple[DataFile, List[DataFile]]] = []
+ # This will load the Parquet file into memory, including:
+ # - Filter out the rows based on the delete filter
+ # - Projecting it to the current schema
+ # - Applying the positional deletes if they are there
+ # When writing
+ # - Apply the latest partition-spec
+ # - And sort order when added
+ for original_file in files:
+ df = project_table(
+ tasks=[original_file],
+ table_metadata=self._table.metadata,
+ io=self._table.io,
+ row_filter=AlwaysTrue(),
+ projected_schema=self.table_metadata.schema(),
+ )
+ filtered_df = df.filter(preserve_row_filter)
+
+ # Only rewrite if there are records being deleted
+ if len(df) != len(filtered_df):
+ replaced_files.append((
+ original_file.file,
+ list(
+ _dataframe_to_data_files(
+ io=self._table.io,
+ df=filtered_df,
+ table_metadata=self._table.metadata,
+ write_uuid=commit_uuid,
+ counter=counter,
+ )
+ ),
+ ))
+
+ if len(replaced_files) > 0:
+ with
self.update_snapshot(snapshot_properties=snapshot_properties).overwrite(
+ commit_uuid=commit_uuid
+ ) as overwrite_snapshot:
+ for original_data_file, replaced_data_files in
replaced_files:
+ overwrite_snapshot.delete_data_file(original_data_file)
+ for replaced_data_file in replaced_data_files:
+
overwrite_snapshot.append_data_file(replaced_data_file)
+
+ if not delete_snapshot.files_affected and not
delete_snapshot.rewrites_needed:
+ warnings.warn("Delete operation did not match any records")
+
def add_files(self, file_paths: List[str], snapshot_properties: Dict[str,
str] = EMPTY_DICT) -> None:
"""
Shorthand API for adding files as data files to the table transaction.
@@ -1381,6 +1500,9 @@ class Table:
return self.snapshot_by_id(self.metadata.current_snapshot_id)
return None
+ def snapshots(self) -> List[Snapshot]:
+ return self.metadata.snapshots
+
def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
"""Get the snapshot of this table with the given id, or None if there
is no matching snapshot."""
return self.metadata.snapshot_by_id(snapshot_id)
@@ -1455,11 +1577,20 @@ class Table:
tx.append(df=df, snapshot_properties=snapshot_properties)
def overwrite(
- self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE,
snapshot_properties: Dict[str, str] = EMPTY_DICT
+ self,
+ df: pa.Table,
+ overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
+ snapshot_properties: Dict[str, str] = EMPTY_DICT,
) -> None:
"""
Shorthand for overwriting the table with a PyArrow table.
+ An overwrite may produce zero or more snapshots based on the operation:
+
+ - DELETE: In case existing Parquet files can be dropped completely.
+ - REPLACE: In case existing Parquet files need to be rewritten.
+ - APPEND: In case new data is being inserted into the table.
+
Args:
df: The Arrow dataframe that will be used to overwrite the table
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
@@ -1469,6 +1600,19 @@ class Table:
with self.transaction() as tx:
tx.overwrite(df=df, overwrite_filter=overwrite_filter,
snapshot_properties=snapshot_properties)
+ def delete(
+ self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
snapshot_properties: Dict[str, str] = EMPTY_DICT
+ ) -> None:
+ """
+ Shorthand for deleting rows from the table.
+
+ Args:
+ delete_filter: The predicate that used to remove rows
+ snapshot_properties: Custom properties to be added to the snapshot
summary
+ """
+ with self.transaction() as tx:
+ tx.delete(delete_filter=delete_filter,
snapshot_properties=snapshot_properties)
+
def add_files(self, file_paths: List[str], snapshot_properties: Dict[str,
str] = EMPTY_DICT) -> None:
"""
Shorthand API for adding files as data files to the table.
@@ -2904,52 +3048,6 @@ def _generate_manifest_list_path(location: str,
snapshot_id: int, attempt: int,
return
f"{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
-def _dataframe_to_data_files(
- table_metadata: TableMetadata, df: pa.Table, io: FileIO, write_uuid:
Optional[uuid.UUID] = None
-) -> Iterable[DataFile]:
- """Convert a PyArrow table into a DataFile.
-
- Returns:
- An iterable that supplies datafiles that represent the table.
- """
- from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file
-
- counter = itertools.count(0)
- write_uuid = write_uuid or uuid.uuid4()
- target_file_size: int = PropertyUtil.property_as_int( # type: ignore #
The property is set with non-None value.
- properties=table_metadata.properties,
- property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
- default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
- )
-
- if len(table_metadata.spec().fields) > 0:
- partitions = _determine_partitions(spec=table_metadata.spec(),
schema=table_metadata.schema(), arrow_table=df)
- yield from write_file(
- io=io,
- table_metadata=table_metadata,
- tasks=iter([
- WriteTask(
- write_uuid=write_uuid,
- task_id=next(counter),
- record_batches=batches,
- partition_key=partition.partition_key,
- schema=table_metadata.schema(),
- )
- for partition in partitions
- for batches in
bin_pack_arrow_table(partition.arrow_table_partition, target_file_size)
- ]),
- )
- else:
- yield from write_file(
- io=io,
- table_metadata=table_metadata,
- tasks=iter([
- WriteTask(write_uuid=write_uuid, task_id=next(counter),
record_batches=batches, schema=table_metadata.schema())
- for batches in bin_pack_arrow_table(df, target_file_size)
- ]),
- )
-
-
def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths:
List[str], io: FileIO) -> Iterable[DataFile]:
"""Convert a list files into DataFiles.
@@ -2961,12 +3059,14 @@ def _parquet_files_to_data_files(table_metadata:
TableMetadata, file_paths: List
yield from parquet_files_to_data_files(io=io,
table_metadata=table_metadata, file_paths=iter(file_paths))
-class
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
+class _MergingSnapshotProducer(UpdateTableMetadata[U], Generic[U]):
commit_uuid: uuid.UUID
_operation: Operation
_snapshot_id: int
_parent_snapshot_id: Optional[int]
_added_data_files: List[DataFile]
+ _deleted_data_files: Set[DataFile]
+ _manifest_counter: itertools.count[int]
def __init__(
self,
@@ -2986,12 +3086,18 @@ class
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
snapshot.snapshot_id if (snapshot :=
self._transaction.table_metadata.current_snapshot()) else None
)
self._added_data_files = []
+ self._deleted_data_files = set()
self.snapshot_properties = snapshot_properties
+ self._manifest_counter = itertools.count(0)
- def append_data_file(self, data_file: DataFile) ->
_MergingSnapshotProducer:
+ def append_data_file(self, data_file: DataFile) ->
_MergingSnapshotProducer[U]:
self._added_data_files.append(data_file)
return self
+ def delete_data_file(self, data_file: DataFile) ->
_MergingSnapshotProducer[U]:
+ self._deleted_data_files.add(data_file)
+ return self
+
@abstractmethod
def _deleted_entries(self) -> List[ManifestEntry]: ...
@@ -3002,7 +3108,9 @@ class
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
def _write_added_manifest() -> List[ManifestFile]:
if self._added_data_files:
output_file_location = _new_manifest_path(
- location=self._transaction.table_metadata.location, num=0,
commit_uuid=self.commit_uuid
+ location=self._transaction.table_metadata.location,
+ num=next(self._manifest_counter),
+ commit_uuid=self.commit_uuid,
)
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
@@ -3030,7 +3138,9 @@ class
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
deleted_entries = self._deleted_entries()
if len(deleted_entries) > 0:
output_file_location = _new_manifest_path(
- location=self._transaction.table_metadata.location, num=1,
commit_uuid=self.commit_uuid
+ location=self._transaction.table_metadata.location,
+ num=next(self._manifest_counter),
+ commit_uuid=self.commit_uuid,
)
with write_manifest(
@@ -3070,6 +3180,15 @@ class
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
schema=self._transaction.table_metadata.schema(),
)
+ if len(self._deleted_data_files) > 0:
+ specs = self._transaction.table_metadata.specs()
+ for data_file in self._deleted_data_files:
+ ssc.remove_file(
+ data_file=data_file,
+ partition_spec=specs[data_file.spec_id],
+ schema=self._transaction.table_metadata.schema(),
+ )
+
previous_snapshot = (
self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
if self._parent_snapshot_id is not None
@@ -3119,11 +3238,153 @@ class
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
snapshot_id=self._snapshot_id,
parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch"
),
),
- (AssertRefSnapshotId(snapshot_id=self._parent_snapshot_id,
ref="main"),),
+
(AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id,
ref="main"),),
)
-class FastAppendFiles(_MergingSnapshotProducer):
+class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]):
+ """Will delete manifest entries from the current snapshot based on the
predicate.
+
+ This will produce a DELETE snapshot:
+ Data files were removed and their contents logically deleted and/or
delete
+ files were added to delete rows.
+
+ From the specification
+ """
+
+ _predicate: BooleanExpression
+
+ def __init__(
+ self,
+ operation: Operation,
+ transaction: Transaction,
+ io: FileIO,
+ commit_uuid: Optional[uuid.UUID] = None,
+ snapshot_properties: Dict[str, str] = EMPTY_DICT,
+ ):
+ super().__init__(operation, transaction, io, commit_uuid,
snapshot_properties)
+ self._predicate = AlwaysFalse()
+
+ def _commit(self) -> UpdatesAndRequirements:
+ # Only produce a commit when there is something to delete
+ if self.files_affected:
+ return super()._commit()
+ else:
+ return (), ()
+
+ def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
+ schema = self._transaction.table_metadata.schema()
+ spec = self._transaction.table_metadata.specs()[spec_id]
+ project = inclusive_projection(schema, spec)
+ return project(self._predicate)
+
+ @cached_property
+ def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
+ return KeyDefaultDict(self._build_partition_projection)
+
+ def _build_manifest_evaluator(self, spec_id: int) ->
Callable[[ManifestFile], bool]:
+ schema = self._transaction.table_metadata.schema()
+ spec = self._transaction.table_metadata.specs()[spec_id]
+ return manifest_evaluator(spec, schema,
self.partition_filters[spec_id], case_sensitive=True)
+
+ def delete_by_predicate(self, predicate: BooleanExpression) -> None:
+ self._predicate = Or(self._predicate, predicate)
+
+ @cached_property
+ def _compute_deletes(self) -> Tuple[List[ManifestFile],
List[ManifestEntry], bool]:
+ """Computes all the delete operation and cache it when nothing changes.
+
+ Returns:
+ - List of existing manifests that are not affected by the delete
operation.
+ - The manifest-entries that are deleted based on the metadata.
+ - Flag indicating that rewrites of data-files are needed.
+ """
+ schema = self._transaction.table_metadata.schema()
+
+ def _copy_with_new_status(entry: ManifestEntry, status:
ManifestEntryStatus) -> ManifestEntry:
+ return ManifestEntry(
+ status=status,
+ snapshot_id=entry.snapshot_id,
+ data_sequence_number=entry.data_sequence_number,
+ file_sequence_number=entry.file_sequence_number,
+ data_file=entry.data_file,
+ )
+
+ manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] =
KeyDefaultDict(self._build_manifest_evaluator)
+ strict_metrics_evaluator = _StrictMetricsEvaluator(schema,
self._predicate, case_sensitive=True).eval
+ inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(schema,
self._predicate, case_sensitive=True).eval
+
+ existing_manifests = []
+ total_deleted_entries = []
+ partial_rewrites_needed = False
+ self._deleted_data_files = set()
+ if snapshot := self._transaction.table_metadata.current_snapshot():
+ for manifest_file in snapshot.manifests(io=self._io):
+ if manifest_file.content == ManifestContent.DATA:
+ if not
manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
+ # If the manifest isn't relevant, we can just keep it
in the manifest-list
+ existing_manifests.append(manifest_file)
+ else:
+ # It is relevant, let's check out the content
+ deleted_entries = []
+ existing_entries = []
+ for entry in
manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
+ if strict_metrics_evaluator(entry.data_file) ==
ROWS_MUST_MATCH:
+
deleted_entries.append(_copy_with_new_status(entry,
ManifestEntryStatus.DELETED))
+ self._deleted_data_files.add(entry.data_file)
+ elif inclusive_metrics_evaluator(entry.data_file)
== ROWS_CANNOT_MATCH:
+
existing_entries.append(_copy_with_new_status(entry,
ManifestEntryStatus.EXISTING))
+ else:
+ # Based on the metadata, it is unsure to say
if the file can be deleted
+ partial_rewrites_needed = True
+
+ if len(deleted_entries) > 0:
+ total_deleted_entries += deleted_entries
+
+ # Rewrite the manifest
+ if len(existing_entries) > 0:
+ output_file_location = _new_manifest_path(
+
location=self._transaction.table_metadata.location,
+ num=next(self._manifest_counter),
+ commit_uuid=self.commit_uuid,
+ )
+ with write_manifest(
+
format_version=self._transaction.table_metadata.format_version,
+
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
+
schema=self._transaction.table_metadata.schema(),
+
output_file=self._io.new_output(output_file_location),
+ snapshot_id=self._snapshot_id,
+ ) as writer:
+ for existing_entry in existing_entries:
+ writer.add_entry(existing_entry)
+
existing_manifests.append(writer.to_manifest_file())
+ # else:
+ # deleted_manifests.append()
+ else:
+ existing_manifests.append(manifest_file)
+ else:
+ existing_manifests.append(manifest_file)
+
+ return existing_manifests, total_deleted_entries,
partial_rewrites_needed
+
+ def _existing_manifests(self) -> List[ManifestFile]:
+ return self._compute_deletes[0]
+
+ def _deleted_entries(self) -> List[ManifestEntry]:
+ return self._compute_deletes[1]
+
+ @property
+ def rewrites_needed(self) -> bool:
+ """Indicate if data files need to be rewritten."""
+ return self._compute_deletes[2]
+
+ @property
+ def files_affected(self) -> bool:
+ """Indicate if any manifest-entries can be dropped."""
+ return len(self._deleted_entries()) > 0
+
+
+class FastAppendFiles(_MergingSnapshotProducer["FastAppendFiles"]):
def _existing_manifests(self) -> List[ManifestFile]:
"""To determine if there are any existing manifest files.
@@ -3152,14 +3413,53 @@ class FastAppendFiles(_MergingSnapshotProducer):
return []
-class OverwriteFiles(_MergingSnapshotProducer):
+class OverwriteFiles(_MergingSnapshotProducer["OverwriteFiles"]):
+ """Overwrites data from the table. This will produce an OVERWRITE snapshot.
+
+ Data and delete files were added and removed in a logical overwrite
operation.
+ """
+
def _existing_manifests(self) -> List[ManifestFile]:
- """To determine if there are any existing manifest files.
+ """Determine if there are any existing manifest files."""
+ existing_files = []
- In the of a full overwrite, all the previous manifests are
- considered deleted.
- """
- return []
+ if snapshot := self._transaction.table_metadata.current_snapshot():
+ for manifest_file in snapshot.manifests(io=self._io):
+ entries = manifest_file.fetch_manifest_entry(io=self._io,
discard_deleted=True)
+ found_deleted_data_files = [entry.data_file for entry in
entries if entry.data_file in self._deleted_data_files]
+
+ if len(found_deleted_data_files) == 0:
+ existing_files.append(manifest_file)
+ else:
+ # We have to rewrite the
+ output_file_location = _new_manifest_path(
+ location=self._transaction.table_metadata.location,
+ num=next(self._manifest_counter),
+ commit_uuid=self.commit_uuid,
+ )
+ if any(entry.data_file not in found_deleted_data_files for
entry in entries):
+ with write_manifest(
+
format_version=self._transaction.table_metadata.format_version,
+ spec=self._transaction.table_metadata.spec(),
+ schema=self._transaction.table_metadata.schema(),
+
output_file=self._io.new_output(output_file_location),
+ snapshot_id=self._snapshot_id,
+ ) as writer:
+ [
+ writer.add_entry(
+ ManifestEntry(
+ status=ManifestEntryStatus.EXISTING,
+ snapshot_id=entry.snapshot_id,
+
data_sequence_number=entry.data_sequence_number,
+
file_sequence_number=entry.file_sequence_number,
+ data_file=entry.data_file,
+ )
+ )
+ for entry in entries
+ if entry.data_file not in
found_deleted_data_files
+ ]
+ existing_files.append(writer.to_manifest_file())
+ return existing_files
def _deleted_entries(self) -> List[ManifestEntry]:
"""To determine if we need to record any deleted entries.
@@ -3186,7 +3486,7 @@ class OverwriteFiles(_MergingSnapshotProducer):
data_file=entry.data_file,
)
for entry in manifest.fetch_manifest_entry(self._io,
discard_deleted=True)
- if entry.data_file.content == DataFileContent.DATA
+ if entry.data_file.content == DataFileContent.DATA and
entry.data_file in self._deleted_data_files
]
list_of_entries = executor.map(_get_entries,
previous_snapshot.manifests(self._io))
@@ -3200,7 +3500,7 @@ class UpdateSnapshot:
_io: FileIO
_snapshot_properties: Dict[str, str]
- def __init__(self, transaction: Transaction, io: FileIO,
snapshot_properties: Dict[str, str]) -> None:
+ def __init__(self, transaction: Transaction, io: FileIO,
snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
self._transaction = transaction
self._io = io
self._snapshot_properties = snapshot_properties
@@ -3210,8 +3510,9 @@ class UpdateSnapshot:
operation=Operation.APPEND, transaction=self._transaction,
io=self._io, snapshot_properties=self._snapshot_properties
)
- def overwrite(self) -> OverwriteFiles:
+ def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) ->
OverwriteFiles:
return OverwriteFiles(
+ commit_uuid=commit_uuid,
operation=Operation.OVERWRITE
if self._transaction.table_metadata.current_snapshot() is not None
else Operation.APPEND,
@@ -3220,6 +3521,14 @@ class UpdateSnapshot:
snapshot_properties=self._snapshot_properties,
)
+ def delete(self) -> DeleteFiles:
+ return DeleteFiles(
+ operation=Operation.DELETE,
+ transaction=self._transaction,
+ io=self._io,
+ snapshot_properties=self._snapshot_properties,
+ )
+
class UpdateSpec(UpdateTableMetadata["UpdateSpec"]):
_transaction: Transaction
@@ -4056,7 +4365,7 @@ def _determine_partitions(spec: PartitionSpec, schema:
Schema, arrow_table: pa.T
{'year': [2020, 2022, 2022, 2021, 2022, 2022, 2022, 2019, 2021],
'n_legs': [2, 2, 2, 4, 4, 4, 4, 5, 100],
'animal': ["Flamingo", "Parrot", "Parrot", "Dog", "Horse", "Horse",
"Horse","Brittle stars", "Centipede"]}.
- The algrithm:
+ The algorithm:
Firstly we group the rows into partitions by sorting with sort order
[('n_legs', 'descending'), ('year', 'descending')]
and null_placement of "at_end".
This gives the same table as raw input.
diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py
index 842d4252..1ccb0799 100644
--- a/pyiceberg/table/snapshots.py
+++ b/pyiceberg/table/snapshots.py
@@ -352,7 +352,7 @@ def _truncate_table_summary(summary: Summary,
previous_summary: Mapping[str, str
def update_snapshot_summaries(
summary: Summary, previous_summary: Optional[Mapping[str, str]] = None,
truncate_full_table: bool = False
) -> Summary:
- if summary.operation not in {Operation.APPEND, Operation.OVERWRITE}:
+ if summary.operation not in {Operation.APPEND, Operation.OVERWRITE,
Operation.DELETE}:
raise ValueError(f"Operation not implemented: {summary.operation}")
if truncate_full_table and summary.operation == Operation.OVERWRITE and
previous_summary is not None:
diff --git a/tests/catalog/integration_test_glue.py
b/tests/catalog/integration_test_glue.py
index 21c41521..c69bc86c 100644
--- a/tests/catalog/integration_test_glue.py
+++ b/tests/catalog/integration_test_glue.py
@@ -33,9 +33,8 @@ from pyiceberg.exceptions import (
NoSuchTableError,
TableAlreadyExistsError,
)
-from pyiceberg.io.pyarrow import schema_to_pyarrow
+from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow
from pyiceberg.schema import Schema
-from pyiceberg.table import _dataframe_to_data_files
from pyiceberg.types import IntegerType
from tests.conftest import clean_up, get_bucket_name, get_s3_path
diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py
index bb8fcea3..9251d717 100644
--- a/tests/catalog/test_sql.py
+++ b/tests/catalog/test_sql.py
@@ -39,10 +39,9 @@ from pyiceberg.exceptions import (
TableAlreadyExistsError,
)
from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL
-from pyiceberg.io.pyarrow import schema_to_pyarrow
+from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
from pyiceberg.schema import Schema
-from pyiceberg.table import _dataframe_to_data_files
from pyiceberg.table.snapshots import Operation
from pyiceberg.table.sorting import (
NullOrder,
diff --git a/tests/conftest.py b/tests/conftest.py
index d200f3ab..95e1128a 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -2299,7 +2299,37 @@ def arrow_table_with_null(pa_schema: "pa.Schema") ->
"pa.Table":
"""Pyarrow table with all kinds of columns."""
import pyarrow as pa
- return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)
+ return pa.Table.from_pydict(
+ {
+ "bool": [False, None, True],
+ "string": ["a", None, "z"],
+ # Go over the 16 bytes to kick in truncation
+ "string_long": ["a" * 22, None, "z" * 22],
+ "int": [1, None, 9],
+ "long": [1, None, 9],
+ "float": [0.0, None, 0.9],
+ "double": [0.0, None, 0.9],
+ # 'time': [1_000_000, None, 3_000_000], # Example times: 1s,
none, and 3s past midnight #Spark does not support time fields
+ "timestamp": [datetime(2023, 1, 1, 19, 25, 00), None,
datetime(2023, 3, 1, 19, 25, 00)],
+ "timestamptz": [
+ datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc),
+ None,
+ datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc),
+ ],
+ "date": [date(2023, 1, 1), None, date(2023, 3, 1)],
+ # Not supported by Spark
+ # 'time': [time(1, 22, 0), None, time(19, 25, 0)],
+ # Not natively supported by Arrow
+ # 'uuid':
[uuid.UUID('00000000-0000-0000-0000-000000000000').bytes, None,
uuid.UUID('11111111-1111-1111-1111-111111111111').bytes],
+ "binary": [b"\01", None, b"\22"],
+ "fixed": [
+ uuid.UUID("00000000-0000-0000-0000-000000000000").bytes,
+ None,
+ uuid.UUID("11111111-1111-1111-1111-111111111111").bytes,
+ ],
+ },
+ schema=pa_schema,
+ )
@pytest.fixture(scope="session")
diff --git a/tests/integration/test_deletes.py
b/tests/integration/test_deletes.py
new file mode 100644
index 00000000..ad3adede
--- /dev/null
+++ b/tests/integration/test_deletes.py
@@ -0,0 +1,419 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+from typing import List
+
+import pyarrow as pa
+import pytest
+from pyspark.sql import SparkSession
+
+from pyiceberg.catalog.rest import RestCatalog
+from pyiceberg.exceptions import NoSuchTableError
+from pyiceberg.expressions import AlwaysTrue, EqualTo
+from pyiceberg.manifest import ManifestEntryStatus
+from pyiceberg.schema import Schema
+from pyiceberg.table.snapshots import Operation, Summary
+from pyiceberg.types import IntegerType, NestedField
+
+
+def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None:
+ for sql in sqls:
+ spark.sql(sql)
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_partitioned_table_delete_full_file(spark: SparkSession,
session_catalog: RestCatalog, format_version: int) -> None:
+ identifier = "default.table_partitioned_delete"
+
+ run_spark_commands(
+ spark,
+ [
+ f"DROP TABLE IF EXISTS {identifier}",
+ f"""
+ CREATE TABLE {identifier} (
+ number_partitioned int,
+ number int
+ )
+ USING iceberg
+ PARTITIONED BY (number_partitioned)
+ TBLPROPERTIES('format-version' = {format_version})
+ """,
+ f"""
+ INSERT INTO {identifier} VALUES (10, 20), (10, 30)
+ """,
+ f"""
+ INSERT INTO {identifier} VALUES (11, 20), (11, 30)
+ """,
+ ],
+ )
+
+ tbl = session_catalog.load_table(identifier)
+ tbl.delete(EqualTo("number_partitioned", 10))
+
+ # No overwrite operation
+ assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()]
== ["append", "append", "delete"]
+ assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [11,
11], "number": [20, 30]}
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_partitioned_table_rewrite(spark: SparkSession, session_catalog:
RestCatalog, format_version: int) -> None:
+ identifier = "default.table_partitioned_delete"
+
+ run_spark_commands(
+ spark,
+ [
+ f"DROP TABLE IF EXISTS {identifier}",
+ f"""
+ CREATE TABLE {identifier} (
+ number_partitioned int,
+ number int
+ )
+ USING iceberg
+ PARTITIONED BY (number_partitioned)
+ TBLPROPERTIES('format-version' = {format_version})
+ """,
+ f"""
+ INSERT INTO {identifier} VALUES (10, 20), (10, 30)
+ """,
+ f"""
+ INSERT INTO {identifier} VALUES (11, 20), (11, 30)
+ """,
+ ],
+ )
+
+ tbl = session_catalog.load_table(identifier)
+ tbl.delete(EqualTo("number", 20))
+
+ # We don't delete a whole partition, so there is only a overwrite
+ assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()]
== ["append", "append", "overwrite"]
+ assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [11,
10], "number": [30, 30]}
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_partitioned_table_no_match(spark: SparkSession, session_catalog:
RestCatalog, format_version: int) -> None:
+ identifier = "default.table_partitioned_delete"
+
+ run_spark_commands(
+ spark,
+ [
+ f"DROP TABLE IF EXISTS {identifier}",
+ f"""
+ CREATE TABLE {identifier} (
+ number_partitioned int,
+ number int
+ )
+ USING iceberg
+ PARTITIONED BY (number_partitioned)
+ TBLPROPERTIES('format-version' = {format_version})
+ """,
+ f"""
+ INSERT INTO {identifier} VALUES (10, 20), (10, 30)
+ """,
+ ],
+ )
+
+ tbl = session_catalog.load_table(identifier)
+ tbl.delete(EqualTo("number_partitioned", 22)) # Does not affect any data
+
+ assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()]
== ["append"]
+ assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10,
10], "number": [20, 30]}
+
+
[email protected]
+def test_delete_partitioned_table_positional_deletes(spark: SparkSession,
session_catalog: RestCatalog) -> None:
+ identifier = "default.table_partitioned_delete"
+
+ run_spark_commands(
+ spark,
+ [
+ f"DROP TABLE IF EXISTS {identifier}",
+ f"""
+ CREATE TABLE {identifier} (
+ number_partitioned int,
+ number int
+ )
+ USING iceberg
+ PARTITIONED BY (number_partitioned)
+ TBLPROPERTIES(
+ 'format-version' = 2,
+ 'write.delete.mode'='merge-on-read',
+ 'write.update.mode'='merge-on-read',
+ 'write.merge.mode'='merge-on-read'
+ )
+ """,
+ f"""
+ INSERT INTO {identifier} VALUES (10, 20), (10, 30), (10, 40)
+ """,
+ # Generate a positional delete
+ f"""
+ DELETE FROM {identifier} WHERE number = 30
+ """,
+ ],
+ )
+
+ tbl = session_catalog.load_table(identifier)
+
+ # Assert that there is just a single Parquet file, that has one merge on
read file
+ files = list(tbl.scan().plan_files())
+ assert len(files) == 1
+ assert len(files[0].delete_files) == 1
+
+ # Will rewrite a data file without the positional delete
+ tbl.delete(EqualTo("number", 40))
+
+ # One positional delete has been added, but an OVERWRITE status is set
+ # https://github.com/apache/iceberg/issues/10122
+ assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()]
== ["append", "overwrite", "overwrite"]
+ assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10],
"number": [20]}
+
+
[email protected]
+def test_overwrite_partitioned_table(spark: SparkSession, session_catalog:
RestCatalog) -> None:
+ identifier = "default.table_partitioned_delete"
+
+ run_spark_commands(
+ spark,
+ [
+ f"DROP TABLE IF EXISTS {identifier}",
+ f"""
+ CREATE TABLE {identifier} (
+ number_partitioned int,
+ number int
+ )
+ USING iceberg
+ PARTITIONED BY (number_partitioned)
+ TBLPROPERTIES(
+ 'format-version' = 2,
+ 'write.delete.mode'='merge-on-read',
+ 'write.update.mode'='merge-on-read',
+ 'write.merge.mode'='merge-on-read'
+ )
+ """,
+ f"""
+ INSERT INTO {identifier} VALUES (10, 1), (10, 2), (20, 3)
+ """,
+ ],
+ )
+
+ tbl = session_catalog.load_table(identifier)
+
+ files = list(tbl.scan().plan_files())
+ assert len(files) == 2
+
+ arrow_schema = pa.schema([pa.field("number_partitioned", pa.int32()),
pa.field("number", pa.int32())])
+ arrow_tbl = pa.Table.from_pylist(
+ [
+ {"number_partitioned": 10, "number": 4},
+ {"number_partitioned": 10, "number": 5},
+ ],
+ schema=arrow_schema,
+ )
+
+ # Will rewrite a data file without the positional delete
+ tbl.overwrite(arrow_tbl, "number_partitioned == 10")
+
+ # One positional delete has been added, but an OVERWRITE status is set
+ # https://github.com/apache/iceberg/issues/10122
+ assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()]
== ["append", "delete", "append"]
+ assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10,
10, 20], "number": [4, 5, 3]}
+
+
[email protected]
+def test_partitioned_table_positional_deletes_sequence_number(spark:
SparkSession, session_catalog: RestCatalog) -> None:
+ identifier = "default.table_partitioned_delete_sequence_number"
+
+ # This test case is a bit more complex. Here we run a MoR delete on a
file, we make sure that
+ # the manifest gets rewritten (but not the data file with a MoR), and
check if the delete is still there
+ # to assure that the sequence numbers are maintained
+
+ run_spark_commands(
+ spark,
+ [
+ f"DROP TABLE IF EXISTS {identifier}",
+ f"""
+ CREATE TABLE {identifier} (
+ number_partitioned int,
+ number int
+ )
+ USING iceberg
+ PARTITIONED BY (number_partitioned)
+ TBLPROPERTIES(
+ 'format-version' = 2,
+ 'write.delete.mode'='merge-on-read',
+ 'write.update.mode'='merge-on-read',
+ 'write.merge.mode'='merge-on-read'
+ )
+ """,
+ f"""
+ INSERT INTO {identifier} VALUES (10, 100), (10, 101), (20, 200),
(20, 201), (20, 202)
+ """,
+ # Generate a positional delete
+ f"""
+ DELETE FROM {identifier} WHERE number = 101
+ """,
+ ],
+ )
+
+ tbl = session_catalog.load_table(identifier)
+
+ files = list(tbl.scan().plan_files())
+ assert len(files) == 2
+
+ # Will rewrite a data file without a positional delete
+ tbl.delete(EqualTo("number", 201))
+
+ # One positional delete has been added, but an OVERWRITE status is set
+ # https://github.com/apache/iceberg/issues/10122
+ snapshots = tbl.snapshots()
+ assert len(snapshots) == 3
+
+ # Snapshots produced by Spark
+ assert [snapshot.summary.operation.value for snapshot in
tbl.snapshots()[0:2]] == ["append", "overwrite"]
+
+ # Will rewrite one parquet file
+ assert snapshots[2].summary == Summary(
+ Operation.OVERWRITE,
+ **{
+ "added-files-size": "1145",
+ "added-data-files": "1",
+ "added-records": "2",
+ "changed-partition-count": "1",
+ "total-files-size": snapshots[2].summary["total-files-size"],
+ "total-delete-files": "0",
+ "total-data-files": "1",
+ "total-position-deletes": "0",
+ "total-records": "2",
+ "total-equality-deletes": "0",
+ "deleted-data-files": "2",
+ "removed-delete-files": "1",
+ "deleted-records": "5",
+ "removed-files-size": snapshots[2].summary["removed-files-size"],
+ "removed-position-deletes": "1",
+ },
+ )
+
+ assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [20,
20, 10], "number": [200, 202, 100]}
+
+
[email protected]
+def test_delete_no_match(session_catalog: RestCatalog) -> None:
+ arrow_schema = pa.schema([pa.field("ints", pa.int32())])
+ arrow_tbl = pa.Table.from_pylist(
+ [
+ {"ints": 1},
+ {"ints": 3},
+ ],
+ schema=arrow_schema,
+ )
+
+ iceberg_schema = Schema(NestedField(1, "ints", IntegerType()))
+
+ tbl_identifier = "default.test_delete_no_match"
+
+ try:
+ session_catalog.drop_table(tbl_identifier)
+ except NoSuchTableError:
+ pass
+
+ tbl = session_catalog.create_table(tbl_identifier, iceberg_schema)
+ tbl.append(arrow_tbl)
+
+ assert [snapshot.summary.operation for snapshot in tbl.snapshots()] ==
[Operation.APPEND]
+
+ tbl.delete("ints == 2") # Only 1 and 3 in the file, but is between the
lower and upper bound
+
+ assert [snapshot.summary.operation for snapshot in tbl.snapshots()] ==
[Operation.APPEND]
+
+
[email protected]
+def test_delete_overwrite(session_catalog: RestCatalog) -> None:
+ arrow_schema = pa.schema([pa.field("ints", pa.int32())])
+ arrow_tbl = pa.Table.from_pylist(
+ [
+ {"ints": 1},
+ {"ints": 2},
+ ],
+ schema=arrow_schema,
+ )
+
+ iceberg_schema = Schema(NestedField(1, "ints", IntegerType()))
+
+ tbl_identifier = "default.test_delete_overwrite"
+
+ try:
+ session_catalog.drop_table(tbl_identifier)
+ except NoSuchTableError:
+ pass
+
+ tbl = session_catalog.create_table(tbl_identifier, iceberg_schema)
+ tbl.append(arrow_tbl)
+
+ assert [snapshot.summary.operation for snapshot in tbl.snapshots()] ==
[Operation.APPEND]
+
+ arrow_tbl_overwrite = pa.Table.from_pylist(
+ [
+ {"ints": 3},
+ {"ints": 4},
+ ],
+ schema=arrow_schema,
+ )
+ tbl.overwrite(arrow_tbl_overwrite, "ints == 2") # Should rewrite one file
+
+ assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [
+ Operation.APPEND,
+ Operation.OVERWRITE,
+ Operation.APPEND,
+ ]
+
+ assert tbl.scan().to_arrow()["ints"].to_pylist() == [3, 4, 1]
+
+
[email protected]
+def test_delete_truncate(session_catalog: RestCatalog) -> None:
+ arrow_schema = pa.schema([pa.field("ints", pa.int32())])
+ arrow_tbl = pa.Table.from_pylist(
+ [
+ {"ints": 1},
+ ],
+ schema=arrow_schema,
+ )
+
+ iceberg_schema = Schema(NestedField(1, "ints", IntegerType()))
+
+ tbl_identifier = "default.test_delete_overwrite"
+
+ try:
+ session_catalog.drop_table(tbl_identifier)
+ except NoSuchTableError:
+ pass
+
+ tbl = session_catalog.create_table(tbl_identifier, iceberg_schema)
+ tbl.append(arrow_tbl)
+
+ # Effectively a truncate
+ tbl.delete(delete_filter=AlwaysTrue())
+
+ manifests = tbl.current_snapshot().manifests(tbl.io)
+ assert len(manifests) == 1
+
+ entries = manifests[0].fetch_manifest_entry(tbl.io, discard_deleted=False)
+ assert len(entries) == 1
+
+ assert entries[0].status == ManifestEntryStatus.DELETED
diff --git a/tests/integration/test_inspect_table.py
b/tests/integration/test_inspect_table.py
index 834fe83d..d8a83e0d 100644
--- a/tests/integration/test_inspect_table.py
+++ b/tests/integration/test_inspect_table.py
@@ -103,13 +103,14 @@ def test_inspect_snapshots(
assert isinstance(snapshot_id.as_py(), int)
assert df["parent_id"][0].as_py() is None
- assert df["parent_id"][1:] == df["snapshot_id"][:2]
+ assert df["parent_id"][1:].to_pylist() ==
df["snapshot_id"][:-1].to_pylist()
- assert [operation.as_py() for operation in df["operation"]] == ["append",
"overwrite", "append"]
+ assert [operation.as_py() for operation in df["operation"]] == ["append",
"delete", "append", "append"]
for manifest_list in df["manifest_list"]:
assert manifest_list.as_py().startswith("s3://")
+ # Append
assert df["summary"][0].as_py() == [
("added-files-size", "5459"),
("added-data-files", "1"),
@@ -122,6 +123,19 @@ def test_inspect_snapshots(
("total-equality-deletes", "0"),
]
+ # Delete
+ assert df["summary"][1].as_py() == [
+ ("removed-files-size", "5459"),
+ ("deleted-data-files", "1"),
+ ("deleted-records", "3"),
+ ("total-data-files", "0"),
+ ("total-delete-files", "0"),
+ ("total-records", "0"),
+ ("total-files-size", "0"),
+ ("total-position-deletes", "0"),
+ ("total-equality-deletes", "0"),
+ ]
+
lhs = spark.table(f"{identifier}.snapshots").toPandas()
rhs = df.to_pandas()
for column in df.column_names:
diff --git a/tests/integration/test_rest_schema.py
b/tests/integration/test_rest_schema.py
index f4ab98a8..644cb805 100644
--- a/tests/integration/test_rest_schema.py
+++ b/tests/integration/test_rest_schema.py
@@ -2512,14 +2512,18 @@ def
test_two_add_schemas_in_a_single_transaction(catalog: Catalog) -> None:
),
)
- with pytest.raises(CommitFailedException) as exc_info:
- with tbl.transaction() as tr:
- with tr.update_schema() as update:
- update.add_column("bar", field_type=StringType())
- with tr.update_schema() as update:
- update.add_column("baz", field_type=StringType())
+ with tbl.transaction() as tr:
+ with tr.update_schema() as update:
+ update.add_column("bar", field_type=StringType())
+ with tr.update_schema() as update:
+ update.add_column("baz", field_type=StringType())
- assert "CommitFailedException: Requirement failed: current schema changed:
expected id 1 != 0" in str(exc_info.value)
+ assert tbl.schema().schema_id == 2
+ assert tbl.schema() == Schema(
+ NestedField(field_id=1, name="foo", field_type=StringType()),
+ NestedField(field_id=2, name="bar", field_type=StringType()),
+ NestedField(field_id=3, name="baz", field_type=StringType()),
+ )
@pytest.mark.integration
diff --git a/tests/integration/test_writes/test_partitioned_writes.py
b/tests/integration/test_writes/test_partitioned_writes.py
index f6e6e93c..59bb7693 100644
--- a/tests/integration/test_writes/test_partitioned_writes.py
+++ b/tests/integration/test_writes/test_partitioned_writes.py
@@ -38,7 +38,6 @@ from pyiceberg.transforms import (
TruncateTransform,
YearTransform,
)
-from tests.conftest import TEST_DATA_WITH_NULL
from utils import TABLE_SCHEMA, _create_table
@@ -70,7 +69,7 @@ def test_query_filter_null_partitioned(
assert tbl.format_version == format_version, f"Expected v{format_version},
got: v{tbl.format_version}"
df = spark.table(identifier)
assert df.count() == 3, f"Expected 3 total rows for {identifier}"
- for col in TEST_DATA_WITH_NULL.keys():
+ for col in arrow_table_with_null.column_names:
assert df.where(f"{col} is not null").count() == 2, f"Expected 2
non-null rows for {col}"
assert df.where(f"{col} is null").count() == 1, f"Expected 1 null row
for {col} is null"
@@ -81,7 +80,12 @@ def test_query_filter_null_partitioned(
)
@pytest.mark.parametrize("format_version", [1, 2])
def test_query_filter_without_data_partitioned(
- session_catalog: Catalog, spark: SparkSession, arrow_table_without_data:
pa.Table, part_col: str, format_version: int
+ session_catalog: Catalog,
+ spark: SparkSession,
+ arrow_table_without_data: pa.Table,
+ part_col: str,
+ arrow_table_with_null: pa.Table,
+ format_version: int,
) -> None:
# Given
identifier =
f"default.arrow_table_v{format_version}_without_data_partitioned_on_col_{part_col}"
@@ -102,7 +106,7 @@ def test_query_filter_without_data_partitioned(
# Then
assert tbl.format_version == format_version, f"Expected v{format_version},
got: v{tbl.format_version}"
df = spark.table(identifier)
- for col in TEST_DATA_WITH_NULL.keys():
+ for col in arrow_table_with_null.column_names:
assert df.where(f"{col} is null").count() == 0, f"Expected 0 row for
{col}"
assert df.where(f"{col} is not null").count() == 0, f"Expected 0 row
for {col}"
@@ -134,7 +138,7 @@ def test_query_filter_only_nulls_partitioned(
# Then
assert tbl.format_version == format_version, f"Expected v{format_version},
got: v{tbl.format_version}"
df = spark.table(identifier)
- for col in TEST_DATA_WITH_NULL.keys():
+ for col in arrow_table_with_only_nulls.column_names:
assert df.where(f"{col} is null").count() == 2, f"Expected 2 row for
{col}"
assert df.where(f"{col} is not null").count() == 0, f"Expected 0 rows
for {col}"
@@ -169,7 +173,7 @@ def test_query_filter_appended_null_partitioned(
# Then
assert tbl.format_version == format_version, f"Expected v{format_version},
got: v{tbl.format_version}"
df = spark.table(identifier)
- for col in TEST_DATA_WITH_NULL.keys():
+ for col in arrow_table_with_null.column_names:
assert df.where(f"{col} is not null").count() == 6, f"Expected 6
non-null rows for {col}"
assert df.where(f"{col} is null").count() == 3, f"Expected 3 null rows
for {col}"
# expecting 6 files: first append with [A], [B], [C], second append with
[A, A], [B, B], [C, C]
@@ -212,7 +216,7 @@ def test_query_filter_v1_v2_append_null(
# Then
assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
- for col in TEST_DATA_WITH_NULL.keys(): # type: ignore
+ for col in arrow_table_with_null.column_names: # type: ignore
df = spark.table(identifier)
assert df.where(f"{col} is not null").count() == 4, f"Expected 4
non-null rows for {col}"
assert df.where(f"{col} is null").count() == 2, f"Expected 2 null rows
for {col}"
@@ -422,7 +426,7 @@ def test_append_ymd_transform_partitioned(
assert tbl.format_version == format_version, f"Expected v{format_version},
got: v{tbl.format_version}"
df = spark.table(identifier)
assert df.count() == 3, f"Expected 3 total rows for {identifier}"
- for col in TEST_DATA_WITH_NULL.keys():
+ for col in arrow_table_with_null.column_names:
assert df.where(f"{col} is not null").count() == 2, f"Expected 2
non-null rows for {col}"
assert df.where(f"{col} is null").count() == 1, f"Expected 1 null row
for {col} is null"
diff --git a/tests/integration/test_writes/test_writes.py
b/tests/integration/test_writes/test_writes.py
index 4585406c..9a541bc8 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -37,12 +37,12 @@ from pyiceberg.catalog.hive import HiveCatalog
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import NoSuchTableError
+from pyiceberg.io.pyarrow import _dataframe_to_data_files
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
-from pyiceberg.table import TableProperties, _dataframe_to_data_files
+from pyiceberg.table import TableProperties
from pyiceberg.transforms import IdentityTransform
from pyiceberg.types import IntegerType, NestedField
-from tests.conftest import TEST_DATA_WITH_NULL
from utils import _create_table
@@ -124,52 +124,55 @@ def test_query_count(spark: SparkSession, format_version:
int) -> None:
@pytest.mark.integration
[email protected]("col", TEST_DATA_WITH_NULL.keys())
@pytest.mark.parametrize("format_version", [1, 2])
-def test_query_filter_null(spark: SparkSession, col: str, format_version: int)
-> None:
+def test_query_filter_null(spark: SparkSession, arrow_table_with_null:
pa.Table, format_version: int) -> None:
identifier = f"default.arrow_table_v{format_version}_with_null"
df = spark.table(identifier)
- assert df.where(f"{col} is null").count() == 1, f"Expected 1 row for {col}"
- assert df.where(f"{col} is not null").count() == 2, f"Expected 2 rows for
{col}"
+ for col in arrow_table_with_null.column_names:
+ assert df.where(f"{col} is null").count() == 1, f"Expected 1 row for
{col}"
+ assert df.where(f"{col} is not null").count() == 2, f"Expected 2 rows
for {col}"
@pytest.mark.integration
[email protected]("col", TEST_DATA_WITH_NULL.keys())
@pytest.mark.parametrize("format_version", [1, 2])
-def test_query_filter_without_data(spark: SparkSession, col: str,
format_version: int) -> None:
+def test_query_filter_without_data(spark: SparkSession, arrow_table_with_null:
pa.Table, format_version: int) -> None:
identifier = f"default.arrow_table_v{format_version}_without_data"
df = spark.table(identifier)
- assert df.where(f"{col} is null").count() == 0, f"Expected 0 row for {col}"
- assert df.where(f"{col} is not null").count() == 0, f"Expected 0 row for
{col}"
+ for col in arrow_table_with_null.column_names:
+ assert df.where(f"{col} is null").count() == 0, f"Expected 0 row for
{col}"
+ assert df.where(f"{col} is not null").count() == 0, f"Expected 0 row
for {col}"
@pytest.mark.integration
[email protected]("col", TEST_DATA_WITH_NULL.keys())
@pytest.mark.parametrize("format_version", [1, 2])
-def test_query_filter_only_nulls(spark: SparkSession, col: str,
format_version: int) -> None:
+def test_query_filter_only_nulls(spark: SparkSession, arrow_table_with_null:
pa.Table, format_version: int) -> None:
identifier = f"default.arrow_table_v{format_version}_with_only_nulls"
df = spark.table(identifier)
- assert df.where(f"{col} is null").count() == 2, f"Expected 2 rows for
{col}"
- assert df.where(f"{col} is not null").count() == 0, f"Expected 0 row for
{col}"
+ for col in arrow_table_with_null.column_names:
+ assert df.where(f"{col} is null").count() == 2, f"Expected 2 rows for
{col}"
+ assert df.where(f"{col} is not null").count() == 0, f"Expected 0 row
for {col}"
@pytest.mark.integration
[email protected]("col", TEST_DATA_WITH_NULL.keys())
@pytest.mark.parametrize("format_version", [1, 2])
-def test_query_filter_appended_null(spark: SparkSession, col: str,
format_version: int) -> None:
+def test_query_filter_appended_null(spark: SparkSession,
arrow_table_with_null: pa.Table, format_version: int) -> None:
identifier = f"default.arrow_table_v{format_version}_appended_with_null"
df = spark.table(identifier)
- assert df.where(f"{col} is null").count() == 2, f"Expected 1 row for {col}"
- assert df.where(f"{col} is not null").count() == 4, f"Expected 2 rows for
{col}"
+ for col in arrow_table_with_null.column_names:
+ assert df.where(f"{col} is null").count() == 2, f"Expected 1 row for
{col}"
+ assert df.where(f"{col} is not null").count() == 4, f"Expected 2 rows
for {col}"
@pytest.mark.integration
[email protected]("col", TEST_DATA_WITH_NULL.keys())
-def test_query_filter_v1_v2_append_null(spark: SparkSession, col: str) -> None:
+def test_query_filter_v1_v2_append_null(
+ spark: SparkSession,
+ arrow_table_with_null: pa.Table,
+) -> None:
identifier = "default.arrow_table_v1_v2_appended_with_null"
df = spark.table(identifier)
- assert df.where(f"{col} is null").count() == 2, f"Expected 1 row for {col}"
- assert df.where(f"{col} is not null").count() == 4, f"Expected 2 rows for
{col}"
+ for col in arrow_table_with_null.column_names:
+ assert df.where(f"{col} is null").count() == 2, f"Expected 1 row for
{col}"
+ assert df.where(f"{col} is not null").count() == 4, f"Expected 2 rows
for {col}"
@pytest.mark.integration
@@ -187,10 +190,11 @@ def test_summaries(spark: SparkSession, session_catalog:
Catalog, arrow_table_wi
).collect()
operations = [row.operation for row in rows]
- assert operations == ["append", "append", "overwrite"]
+ assert operations == ["append", "append", "delete", "append"]
summaries = [row.summary for row in rows]
+ # Append
assert summaries[0] == {
"added-data-files": "1",
"added-files-size": "5459",
@@ -203,6 +207,7 @@ def test_summaries(spark: SparkSession, session_catalog:
Catalog, arrow_table_wi
"total-records": "3",
}
+ # Append
assert summaries[1] == {
"added-data-files": "1",
"added-files-size": "5459",
@@ -215,13 +220,24 @@ def test_summaries(spark: SparkSession, session_catalog:
Catalog, arrow_table_wi
"total-records": "6",
}
+ # Delete
assert summaries[2] == {
- "added-data-files": "1",
- "added-files-size": "5459",
- "added-records": "3",
"deleted-data-files": "2",
"deleted-records": "6",
"removed-files-size": "10918",
+ "total-data-files": "0",
+ "total-delete-files": "0",
+ "total-equality-deletes": "0",
+ "total-files-size": "0",
+ "total-position-deletes": "0",
+ "total-records": "0",
+ }
+
+ # Overwrite
+ assert summaries[3] == {
+ "added-data-files": "1",
+ "added-files-size": "5459",
+ "added-records": "3",
"total-data-files": "1",
"total-delete-files": "0",
"total-equality-deletes": "0",
@@ -249,9 +265,9 @@ def test_data_files(spark: SparkSession, session_catalog:
Catalog, arrow_table_w
"""
).collect()
- assert [row.added_data_files_count for row in rows] == [1, 1, 0, 1, 1]
+ assert [row.added_data_files_count for row in rows] == [1, 0, 1, 1, 1]
assert [row.existing_data_files_count for row in rows] == [0, 0, 0, 0, 0]
- assert [row.deleted_data_files_count for row in rows] == [0, 0, 1, 0, 0]
+ assert [row.deleted_data_files_count for row in rows] == [0, 1, 0, 0, 0]
@pytest.mark.integration
@@ -556,7 +572,7 @@ def test_summaries_with_only_nulls(
).collect()
operations = [row.operation for row in rows]
- assert operations == ["append", "append", "overwrite"]
+ assert operations == ["append", "append", "delete", "append"]
summaries = [row.summary for row in rows]
@@ -582,14 +598,23 @@ def test_summaries_with_only_nulls(
}
assert summaries[2] == {
+ "deleted-data-files": "1",
+ "deleted-records": "2",
"removed-files-size": "4239",
+ "total-data-files": "0",
+ "total-delete-files": "0",
"total-equality-deletes": "0",
+ "total-files-size": "0",
"total-position-deletes": "0",
- "deleted-data-files": "1",
+ "total-records": "0",
+ }
+
+ assert summaries[3] == {
+ "total-data-files": "0",
"total-delete-files": "0",
+ "total-equality-deletes": "0",
"total-files-size": "0",
- "deleted-records": "2",
- "total-data-files": "0",
+ "total-position-deletes": "0",
"total-records": "0",
}
@@ -812,13 +837,14 @@ def test_inspect_snapshots(
assert isinstance(snapshot_id.as_py(), int)
assert df["parent_id"][0].as_py() is None
- assert df["parent_id"][1:] == df["snapshot_id"][:2]
+ assert df["parent_id"][1:].to_pylist() ==
df["snapshot_id"][:-1].to_pylist()
- assert [operation.as_py() for operation in df["operation"]] == ["append",
"overwrite", "append"]
+ assert [operation.as_py() for operation in df["operation"]] == ["append",
"delete", "append", "append"]
for manifest_list in df["manifest_list"]:
assert manifest_list.as_py().startswith("s3://")
+ # Append
assert df["summary"][0].as_py() == [
("added-files-size", "5459"),
("added-data-files", "1"),
@@ -831,6 +857,19 @@ def test_inspect_snapshots(
("total-equality-deletes", "0"),
]
+ # Delete
+ assert df["summary"][1].as_py() == [
+ ("removed-files-size", "5459"),
+ ("deleted-data-files", "1"),
+ ("deleted-records", "3"),
+ ("total-data-files", "0"),
+ ("total-delete-files", "0"),
+ ("total-records", "0"),
+ ("total-files-size", "0"),
+ ("total-position-deletes", "0"),
+ ("total-equality-deletes", "0"),
+ ]
+
lhs = spark.table(f"{identifier}.snapshots").toPandas()
rhs = df.to_pandas()
for column in df.column_names:
diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py
index fa346405..ff9d92ce 100644
--- a/tests/table/test_snapshots.py
+++ b/tests/table/test_snapshots.py
@@ -314,10 +314,6 @@ def test_invalid_operation() -> None:
update_snapshot_summaries(summary=Summary(Operation.REPLACE))
assert "Operation not implemented: Operation.REPLACE" in str(e.value)
- with pytest.raises(ValueError) as e:
- update_snapshot_summaries(summary=Summary(Operation.DELETE))
- assert "Operation not implemented: Operation.DELETE" in str(e.value)
-
def test_invalid_type() -> None:
with pytest.raises(ValueError) as e: