This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f574d38 Support partial deletes (#569)
3f574d38 is described below

commit 3f574d389b4b5cd17654638a40963eacf65563f1
Author: Fokko Driesprong <[email protected]>
AuthorDate: Tue Jul 9 11:36:43 2024 +0200

    Support partial deletes (#569)
    
    * Add option to delete datafiles
    
    This is done through the Iceberg metadata, resulting
    in efficient deletes if the data is partitioned correctly
    
    * Pull in main
    
    * WIP
    
    * Change DataScan to accept Metadata and io
    
    For the partial deletes I want to do a scan on in
    memory metadata. Changing this API allows this.
    
    * fix name-mapping issue
    
    * WIP
    
    * WIP
    
    * Moar tests
    
    * Oops
    
    * Cleanup
    
    * WIP
    
    * WIP
    
    * Fix summary generation
    
    * Last few bits
    
    * Fix the requirement
    
    * Make ruff happy
    
    * Comments, thanks Kevin!
    
    * Comments
    
    * Append rather than truncate
    
    * Fix merge conflicts
    
    * Make the tests pass
    
    * Add another test
    
    * Conflicts
    
    * Add docs (#33)
    
    * docs
    
    * docs
    
    * Add a partitioned overwrite test
    
    * Fix comment
    
    * Skip empty manifests
    
    ---------
    
    Co-authored-by: HonahX <[email protected]>
    Co-authored-by: Sung Yun <[email protected]>
---
 mkdocs/docs/api.md                                 |  21 +-
 pyiceberg/io/pyarrow.py                            |  64 ++-
 pyiceberg/manifest.py                              |   2 +-
 pyiceberg/table/__init__.py                        | 451 +++++++++++++++++----
 pyiceberg/table/snapshots.py                       |   2 +-
 tests/catalog/integration_test_glue.py             |   3 +-
 tests/catalog/test_sql.py                          |   3 +-
 tests/conftest.py                                  |  32 +-
 tests/integration/test_deletes.py                  | 419 +++++++++++++++++++
 tests/integration/test_inspect_table.py            |  18 +-
 tests/integration/test_rest_schema.py              |  18 +-
 .../test_writes/test_partitioned_writes.py         |  20 +-
 tests/integration/test_writes/test_writes.py       | 107 +++--
 tests/table/test_snapshots.py                      |   4 -
 14 files changed, 1025 insertions(+), 139 deletions(-)

diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 0e80b6eb..7386d029 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -331,12 +331,25 @@ df = pa.Table.from_pylist(
 table.append(df)
 ```
 
-<!-- prettier-ignore-start -->
+You can delete some of the data from the table by calling `tbl.delete()` with 
a desired `delete_filter`.
+
+```python
+tbl.delete(delete_filter="city == 'Paris'")
+```
 
-!!! example "Under development"
-    Writing using PyIceberg is still under development. Support for [partial 
overwrites](https://github.com/apache/iceberg-python/issues/268) and writing to 
[partitioned tables](https://github.com/apache/iceberg-python/issues/208) is 
planned and being worked on.
+In the above example, any records where the city field value equals to `Paris` 
will be deleted.
+Running `tbl.scan().to_arrow()` will now yield:
 
-<!-- prettier-ignore-end -->
+```
+pyarrow.Table
+city: string
+lat: double
+long: double
+----
+city: [["Amsterdam","San Francisco","Drachten"],["Groningen"]]
+lat: [[52.371807,37.773972,53.11254],[53.21917]]
+long: [[4.896029,-122.431297,6.0989],[6.56667]]
+```
 
 ## Inspecting tables
 
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index e6490ae1..50406972 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -31,6 +31,7 @@ import itertools
 import logging
 import os
 import re
+import uuid
 from abc import ABC, abstractmethod
 from concurrent.futures import Future
 from copy import copy
@@ -126,7 +127,6 @@ from pyiceberg.schema import (
     visit,
     visit_with_partner,
 )
-from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
 from pyiceberg.table.metadata import TableMetadata
 from pyiceberg.table.name_mapping import NameMapping
 from pyiceberg.transforms import TruncateTransform
@@ -159,7 +159,7 @@ from pyiceberg.utils.singleton import Singleton
 from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, 
truncate_upper_bound_text_string
 
 if TYPE_CHECKING:
-    from pyiceberg.table import FileScanTask
+    from pyiceberg.table import FileScanTask, WriteTask
 
 logger = logging.getLogger(__name__)
 
@@ -1563,6 +1563,8 @@ class 
PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]
     _default_mode: str
 
     def __init__(self, schema: Schema, properties: Dict[str, str]):
+        from pyiceberg.table import TableProperties
+
         self._schema = schema
         self._properties = properties
         self._default_mode = self._properties.get(
@@ -1598,6 +1600,8 @@ class 
PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]
         return k + v
 
     def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]:
+        from pyiceberg.table import TableProperties
+
         column_name = self._schema.find_column_name(self._field_id)
         if column_name is None:
             return []
@@ -1895,6 +1899,8 @@ def data_file_statistics_from_parquet_metadata(
 
 
 def write_file(io: FileIO, table_metadata: TableMetadata, tasks: 
Iterator[WriteTask]) -> Iterator[DataFile]:
+    from pyiceberg.table import PropertyUtil, TableProperties
+
     parquet_writer_kwargs = 
_get_parquet_writer_kwargs(table_metadata.properties)
     row_group_size = PropertyUtil.property_as_int(
         properties=table_metadata.properties,
@@ -2005,6 +2011,8 @@ PYARROW_UNCOMPRESSED_CODEC = "none"
 
 
 def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
+    from pyiceberg.table import PropertyUtil, TableProperties
+
     for key_pattern in [
         TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
         TableProperties.PARQUET_PAGE_ROW_LIMIT,
@@ -2042,3 +2050,55 @@ def _get_parquet_writer_kwargs(table_properties: 
Properties) -> Dict[str, Any]:
             default=TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT,
         ),
     }
+
+
+def _dataframe_to_data_files(
+    table_metadata: TableMetadata,
+    df: pa.Table,
+    io: FileIO,
+    write_uuid: Optional[uuid.UUID] = None,
+    counter: Optional[itertools.count[int]] = None,
+) -> Iterable[DataFile]:
+    """Convert a PyArrow table into a DataFile.
+
+    Returns:
+        An iterable that supplies datafiles that represent the table.
+    """
+    from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
+
+    counter = counter or itertools.count(0)
+    write_uuid = write_uuid or uuid.uuid4()
+    target_file_size: int = PropertyUtil.property_as_int(  # type: ignore  # 
The property is set with non-None value.
+        properties=table_metadata.properties,
+        property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
+    )
+
+    if table_metadata.spec().is_unpartitioned():
+        yield from write_file(
+            io=io,
+            table_metadata=table_metadata,
+            tasks=iter([
+                WriteTask(write_uuid=write_uuid, task_id=next(counter), 
record_batches=batches, schema=table_metadata.schema())
+                for batches in bin_pack_arrow_table(df, target_file_size)
+            ]),
+        )
+    else:
+        from pyiceberg.table import _determine_partitions
+
+        partitions = _determine_partitions(spec=table_metadata.spec(), 
schema=table_metadata.schema(), arrow_table=df)
+        yield from write_file(
+            io=io,
+            table_metadata=table_metadata,
+            tasks=iter([
+                WriteTask(
+                    write_uuid=write_uuid,
+                    task_id=next(counter),
+                    record_batches=batches,
+                    partition_key=partition.partition_key,
+                    schema=table_metadata.schema(),
+                )
+                for partition in partitions
+                for batches in 
bin_pack_arrow_table(partition.arrow_table_partition, target_file_size)
+            ]),
+        )
diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py
index 4fd82fec..e6a81d2a 100644
--- a/pyiceberg/manifest.py
+++ b/pyiceberg/manifest.py
@@ -341,7 +341,7 @@ class DataFile(Record):
     split_offsets: Optional[List[int]]
     equality_ids: Optional[List[int]]
     sort_order_id: Optional[int]
-    spec_id: Optional[int]
+    spec_id: int
 
     def __setattr__(self, name: str, value: Any) -> None:
         """Assign a key/value to a DataFile."""
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 2eec4d30..8eea9859 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -50,19 +50,27 @@ import pyiceberg.expressions.parser as parser
 from pyiceberg.conversions import from_bytes
 from pyiceberg.exceptions import CommitFailedException, ResolveError, 
ValidationError
 from pyiceberg.expressions import (
+    AlwaysFalse,
     AlwaysTrue,
     And,
     BooleanExpression,
     EqualTo,
+    Not,
+    Or,
     Reference,
 )
 from pyiceberg.expressions.visitors import (
+    ROWS_CANNOT_MATCH,
+    ROWS_MUST_MATCH,
     _InclusiveMetricsEvaluator,
+    _StrictMetricsEvaluator,
+    bind,
     expression_evaluator,
     inclusive_projection,
     manifest_evaluator,
 )
 from pyiceberg.io import FileIO, load_file_io
+from pyiceberg.io.pyarrow import _dataframe_to_data_files, 
expression_to_pyarrow, project_table
 from pyiceberg.manifest import (
     POSITIONAL_DELETE_SCHEMA,
     DataFile,
@@ -238,6 +246,11 @@ class TableProperties:
     WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit"
     WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0
 
+    DELETE_MODE = "write.delete.mode"
+    DELETE_MODE_COPY_ON_WRITE = "copy-on-write"
+    DELETE_MODE_MERGE_ON_READ = "merge-on-read"
+    DELETE_MODE_DEFAULT = DELETE_MODE_COPY_ON_WRITE
+
     DEFAULT_NAME_MAPPING = "schema.name-mapping.default"
     FORMAT_VERSION = "format-version"
     DEFAULT_FORMAT_VERSION = 2
@@ -305,7 +318,13 @@ class Transaction:
             requirement.validate(self.table_metadata)
 
         self._updates += updates
-        self._requirements += requirements
+
+        # For the requirements, it does not make sense to add a requirement 
more than once
+        # For example, you cannot assert that the current schema has two 
different IDs
+        existing_requirements = {type(requirement) for requirement in 
self._requirements}
+        for new_requirement in requirements:
+            if type(new_requirement) not in existing_requirements:
+                self._requirements = self._requirements + requirements
 
         self.table_metadata = update_table_metadata(self.table_metadata, 
updates)
 
@@ -316,6 +335,14 @@ class Transaction:
 
         return self
 
+    def _scan(self, row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE) 
-> DataScan:
+        """Minimal data scan the table with the current state of the 
transaction."""
+        return DataScan(
+            table_metadata=self.table_metadata,
+            io=self._table.io,
+            row_filter=row_filter,
+        )
+
     def upgrade_table_version(self, format_version: TableVersion) -> 
Transaction:
         """Set the table to a certain version.
 
@@ -499,11 +526,20 @@ class Transaction:
                     update_snapshot.append_data_file(data_file)
 
     def overwrite(
-        self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, 
snapshot_properties: Dict[str, str] = EMPTY_DICT
+        self,
+        df: pa.Table,
+        overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
+        snapshot_properties: Dict[str, str] = EMPTY_DICT,
     ) -> None:
         """
         Shorthand for adding a table overwrite with a PyArrow table to the 
transaction.
 
+        An overwrite may produce zero or more snapshots based on the operation:
+
+            - DELETE: In case existing Parquet files can be dropped completely.
+            - REPLACE: In case existing Parquet files need to be rewritten.
+            - APPEND: In case new data is being inserted into the table.
+
         Args:
             df: The Arrow dataframe that will be used to overwrite the table
             overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
@@ -518,11 +554,12 @@ class Transaction:
         if not isinstance(df, pa.Table):
             raise ValueError(f"Expected PyArrow table, got: {df}")
 
-        if overwrite_filter != AlwaysTrue():
-            raise NotImplementedError("Cannot overwrite a subset of a table")
-
-        if len(self._table.spec().fields) > 0:
-            raise ValueError("Cannot write to partitioned tables")
+        if unsupported_partitions := [
+            field for field in self.table_metadata.spec().fields if not 
field.transform.supports_pyarrow_transform
+        ]:
+            raise ValueError(
+                f"Not all partition types are supported for writes. Following 
partitions cannot be written using pyarrow: {unsupported_partitions}."
+            )
 
         _check_schema_compatible(self._table.schema(), other_schema=df.schema)
         # cast if the two schemas are compatible but not equal
@@ -530,7 +567,9 @@ class Transaction:
         if table_arrow_schema != df.schema:
             df = df.cast(table_arrow_schema)
 
-        with 
self.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as 
update_snapshot:
+        self.delete(delete_filter=overwrite_filter, 
snapshot_properties=snapshot_properties)
+
+        with 
self.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as 
update_snapshot:
             # skip writing data files if the dataframe is empty
             if df.shape[0] > 0:
                 data_files = _dataframe_to_data_files(
@@ -539,6 +578,86 @@ class Transaction:
                 for data_file in data_files:
                     update_snapshot.append_data_file(data_file)
 
+    def delete(self, delete_filter: Union[str, BooleanExpression], 
snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
+        """
+        Shorthand for deleting record from a table.
+
+        An deletee may produce zero or more snapshots based on the operation:
+
+            - DELETE: In case existing Parquet files can be dropped completely.
+            - REPLACE: In case existing Parquet files need to be rewritten
+
+        Args:
+            delete_filter: A boolean expression to delete rows from a table
+            snapshot_properties: Custom properties to be added to the snapshot 
summary
+        """
+        if (
+            self.table_metadata.properties.get(TableProperties.DELETE_MODE, 
TableProperties.DELETE_MODE_DEFAULT)
+            == TableProperties.DELETE_MODE_MERGE_ON_READ
+        ):
+            warnings.warn("Merge on read is not yet supported, falling back to 
copy-on-write")
+
+        if isinstance(delete_filter, str):
+            delete_filter = _parse_row_filter(delete_filter)
+
+        with 
self.update_snapshot(snapshot_properties=snapshot_properties).delete() as 
delete_snapshot:
+            delete_snapshot.delete_by_predicate(delete_filter)
+
+        # Check if there are any files that require an actual rewrite of a 
data file
+        if delete_snapshot.rewrites_needed is True:
+            bound_delete_filter = bind(self._table.schema(), delete_filter, 
case_sensitive=True)
+            preserve_row_filter = 
expression_to_pyarrow(Not(bound_delete_filter))
+
+            files = self._scan(row_filter=delete_filter).plan_files()
+
+            commit_uuid = uuid.uuid4()
+            counter = itertools.count(0)
+
+            replaced_files: List[Tuple[DataFile, List[DataFile]]] = []
+            # This will load the Parquet file into memory, including:
+            #   - Filter out the rows based on the delete filter
+            #   - Projecting it to the current schema
+            #   - Applying the positional deletes if they are there
+            # When writing
+            #   - Apply the latest partition-spec
+            #   - And sort order when added
+            for original_file in files:
+                df = project_table(
+                    tasks=[original_file],
+                    table_metadata=self._table.metadata,
+                    io=self._table.io,
+                    row_filter=AlwaysTrue(),
+                    projected_schema=self.table_metadata.schema(),
+                )
+                filtered_df = df.filter(preserve_row_filter)
+
+                # Only rewrite if there are records being deleted
+                if len(df) != len(filtered_df):
+                    replaced_files.append((
+                        original_file.file,
+                        list(
+                            _dataframe_to_data_files(
+                                io=self._table.io,
+                                df=filtered_df,
+                                table_metadata=self._table.metadata,
+                                write_uuid=commit_uuid,
+                                counter=counter,
+                            )
+                        ),
+                    ))
+
+            if len(replaced_files) > 0:
+                with 
self.update_snapshot(snapshot_properties=snapshot_properties).overwrite(
+                    commit_uuid=commit_uuid
+                ) as overwrite_snapshot:
+                    for original_data_file, replaced_data_files in 
replaced_files:
+                        overwrite_snapshot.delete_data_file(original_data_file)
+                        for replaced_data_file in replaced_data_files:
+                            
overwrite_snapshot.append_data_file(replaced_data_file)
+
+        if not delete_snapshot.files_affected and not 
delete_snapshot.rewrites_needed:
+            warnings.warn("Delete operation did not match any records")
+
     def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, 
str] = EMPTY_DICT) -> None:
         """
         Shorthand API for adding files as data files to the table transaction.
@@ -1381,6 +1500,9 @@ class Table:
             return self.snapshot_by_id(self.metadata.current_snapshot_id)
         return None
 
+    def snapshots(self) -> List[Snapshot]:
+        return self.metadata.snapshots
+
     def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
         """Get the snapshot of this table with the given id, or None if there 
is no matching snapshot."""
         return self.metadata.snapshot_by_id(snapshot_id)
@@ -1455,11 +1577,20 @@ class Table:
             tx.append(df=df, snapshot_properties=snapshot_properties)
 
     def overwrite(
-        self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, 
snapshot_properties: Dict[str, str] = EMPTY_DICT
+        self,
+        df: pa.Table,
+        overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
+        snapshot_properties: Dict[str, str] = EMPTY_DICT,
     ) -> None:
         """
         Shorthand for overwriting the table with a PyArrow table.
 
+        An overwrite may produce zero or more snapshots based on the operation:
+
+            - DELETE: In case existing Parquet files can be dropped completely.
+            - REPLACE: In case existing Parquet files need to be rewritten.
+            - APPEND: In case new data is being inserted into the table.
+
         Args:
             df: The Arrow dataframe that will be used to overwrite the table
             overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
@@ -1469,6 +1600,19 @@ class Table:
         with self.transaction() as tx:
             tx.overwrite(df=df, overwrite_filter=overwrite_filter, 
snapshot_properties=snapshot_properties)
 
+    def delete(
+        self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, 
snapshot_properties: Dict[str, str] = EMPTY_DICT
+    ) -> None:
+        """
+        Shorthand for deleting rows from the table.
+
+        Args:
+            delete_filter: The predicate that used to remove rows
+            snapshot_properties: Custom properties to be added to the snapshot 
summary
+        """
+        with self.transaction() as tx:
+            tx.delete(delete_filter=delete_filter, 
snapshot_properties=snapshot_properties)
+
     def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, 
str] = EMPTY_DICT) -> None:
         """
         Shorthand API for adding files as data files to the table.
@@ -2904,52 +3048,6 @@ def _generate_manifest_list_path(location: str, 
snapshot_id: int, attempt: int,
     return 
f"{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"
 
 
-def _dataframe_to_data_files(
-    table_metadata: TableMetadata, df: pa.Table, io: FileIO, write_uuid: 
Optional[uuid.UUID] = None
-) -> Iterable[DataFile]:
-    """Convert a PyArrow table into a DataFile.
-
-    Returns:
-        An iterable that supplies datafiles that represent the table.
-    """
-    from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file
-
-    counter = itertools.count(0)
-    write_uuid = write_uuid or uuid.uuid4()
-    target_file_size: int = PropertyUtil.property_as_int(  # type: ignore  # 
The property is set with non-None value.
-        properties=table_metadata.properties,
-        property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
-        default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
-    )
-
-    if len(table_metadata.spec().fields) > 0:
-        partitions = _determine_partitions(spec=table_metadata.spec(), 
schema=table_metadata.schema(), arrow_table=df)
-        yield from write_file(
-            io=io,
-            table_metadata=table_metadata,
-            tasks=iter([
-                WriteTask(
-                    write_uuid=write_uuid,
-                    task_id=next(counter),
-                    record_batches=batches,
-                    partition_key=partition.partition_key,
-                    schema=table_metadata.schema(),
-                )
-                for partition in partitions
-                for batches in 
bin_pack_arrow_table(partition.arrow_table_partition, target_file_size)
-            ]),
-        )
-    else:
-        yield from write_file(
-            io=io,
-            table_metadata=table_metadata,
-            tasks=iter([
-                WriteTask(write_uuid=write_uuid, task_id=next(counter), 
record_batches=batches, schema=table_metadata.schema())
-                for batches in bin_pack_arrow_table(df, target_file_size)
-            ]),
-        )
-
-
 def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: 
List[str], io: FileIO) -> Iterable[DataFile]:
     """Convert a list files into DataFiles.
 
@@ -2961,12 +3059,14 @@ def _parquet_files_to_data_files(table_metadata: 
TableMetadata, file_paths: List
     yield from parquet_files_to_data_files(io=io, 
table_metadata=table_metadata, file_paths=iter(file_paths))
 
 
-class 
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
+class _MergingSnapshotProducer(UpdateTableMetadata[U], Generic[U]):
     commit_uuid: uuid.UUID
     _operation: Operation
     _snapshot_id: int
     _parent_snapshot_id: Optional[int]
     _added_data_files: List[DataFile]
+    _deleted_data_files: Set[DataFile]
+    _manifest_counter: itertools.count[int]
 
     def __init__(
         self,
@@ -2986,12 +3086,18 @@ class 
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
             snapshot.snapshot_id if (snapshot := 
self._transaction.table_metadata.current_snapshot()) else None
         )
         self._added_data_files = []
+        self._deleted_data_files = set()
         self.snapshot_properties = snapshot_properties
+        self._manifest_counter = itertools.count(0)
 
-    def append_data_file(self, data_file: DataFile) -> 
_MergingSnapshotProducer:
+    def append_data_file(self, data_file: DataFile) -> 
_MergingSnapshotProducer[U]:
         self._added_data_files.append(data_file)
         return self
 
+    def delete_data_file(self, data_file: DataFile) -> 
_MergingSnapshotProducer[U]:
+        self._deleted_data_files.add(data_file)
+        return self
+
     @abstractmethod
     def _deleted_entries(self) -> List[ManifestEntry]: ...
 
@@ -3002,7 +3108,9 @@ class 
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
         def _write_added_manifest() -> List[ManifestFile]:
             if self._added_data_files:
                 output_file_location = _new_manifest_path(
-                    location=self._transaction.table_metadata.location, num=0, 
commit_uuid=self.commit_uuid
+                    location=self._transaction.table_metadata.location,
+                    num=next(self._manifest_counter),
+                    commit_uuid=self.commit_uuid,
                 )
                 with write_manifest(
                     
format_version=self._transaction.table_metadata.format_version,
@@ -3030,7 +3138,9 @@ class 
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
             deleted_entries = self._deleted_entries()
             if len(deleted_entries) > 0:
                 output_file_location = _new_manifest_path(
-                    location=self._transaction.table_metadata.location, num=1, 
commit_uuid=self.commit_uuid
+                    location=self._transaction.table_metadata.location,
+                    num=next(self._manifest_counter),
+                    commit_uuid=self.commit_uuid,
                 )
 
                 with write_manifest(
@@ -3070,6 +3180,15 @@ class 
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
                 schema=self._transaction.table_metadata.schema(),
             )
 
+        if len(self._deleted_data_files) > 0:
+            specs = self._transaction.table_metadata.specs()
+            for data_file in self._deleted_data_files:
+                ssc.remove_file(
+                    data_file=data_file,
+                    partition_spec=specs[data_file.spec_id],
+                    schema=self._transaction.table_metadata.schema(),
+                )
+
         previous_snapshot = (
             
self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
             if self._parent_snapshot_id is not None
@@ -3119,11 +3238,153 @@ class 
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
                     snapshot_id=self._snapshot_id, 
parent_snapshot_id=self._parent_snapshot_id, ref_name="main", type="branch"
                 ),
             ),
-            (AssertRefSnapshotId(snapshot_id=self._parent_snapshot_id, 
ref="main"),),
+            
(AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id,
 ref="main"),),
         )
 
 
-class FastAppendFiles(_MergingSnapshotProducer):
+class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]):
+    """Will delete manifest entries from the current snapshot based on the 
predicate.
+
+    This will produce a DELETE snapshot:
+        Data files were removed and their contents logically deleted and/or 
delete
+        files were added to delete rows.
+
+    From the specification
+    """
+
+    _predicate: BooleanExpression
+
+    def __init__(
+        self,
+        operation: Operation,
+        transaction: Transaction,
+        io: FileIO,
+        commit_uuid: Optional[uuid.UUID] = None,
+        snapshot_properties: Dict[str, str] = EMPTY_DICT,
+    ):
+        super().__init__(operation, transaction, io, commit_uuid, 
snapshot_properties)
+        self._predicate = AlwaysFalse()
+
+    def _commit(self) -> UpdatesAndRequirements:
+        # Only produce a commit when there is something to delete
+        if self.files_affected:
+            return super()._commit()
+        else:
+            return (), ()
+
+    def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
+        schema = self._transaction.table_metadata.schema()
+        spec = self._transaction.table_metadata.specs()[spec_id]
+        project = inclusive_projection(schema, spec)
+        return project(self._predicate)
+
+    @cached_property
+    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
+        return KeyDefaultDict(self._build_partition_projection)
+
+    def _build_manifest_evaluator(self, spec_id: int) -> 
Callable[[ManifestFile], bool]:
+        schema = self._transaction.table_metadata.schema()
+        spec = self._transaction.table_metadata.specs()[spec_id]
+        return manifest_evaluator(spec, schema, 
self.partition_filters[spec_id], case_sensitive=True)
+
+    def delete_by_predicate(self, predicate: BooleanExpression) -> None:
+        self._predicate = Or(self._predicate, predicate)
+
+    @cached_property
+    def _compute_deletes(self) -> Tuple[List[ManifestFile], 
List[ManifestEntry], bool]:
+        """Computes all the delete operation and cache it when nothing changes.
+
+        Returns:
+            - List of existing manifests that are not affected by the delete 
operation.
+            - The manifest-entries that are deleted based on the metadata.
+            - Flag indicating that rewrites of data-files are needed.
+        """
+        schema = self._transaction.table_metadata.schema()
+
+        def _copy_with_new_status(entry: ManifestEntry, status: 
ManifestEntryStatus) -> ManifestEntry:
+            return ManifestEntry(
+                status=status,
+                snapshot_id=entry.snapshot_id,
+                data_sequence_number=entry.data_sequence_number,
+                file_sequence_number=entry.file_sequence_number,
+                data_file=entry.data_file,
+            )
+
+        manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = 
KeyDefaultDict(self._build_manifest_evaluator)
+        strict_metrics_evaluator = _StrictMetricsEvaluator(schema, 
self._predicate, case_sensitive=True).eval
+        inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(schema, 
self._predicate, case_sensitive=True).eval
+
+        existing_manifests = []
+        total_deleted_entries = []
+        partial_rewrites_needed = False
+        self._deleted_data_files = set()
+        if snapshot := self._transaction.table_metadata.current_snapshot():
+            for manifest_file in snapshot.manifests(io=self._io):
+                if manifest_file.content == ManifestContent.DATA:
+                    if not 
manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
+                        # If the manifest isn't relevant, we can just keep it 
in the manifest-list
+                        existing_manifests.append(manifest_file)
+                    else:
+                        # It is relevant, let's check out the content
+                        deleted_entries = []
+                        existing_entries = []
+                        for entry in 
manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
+                            if strict_metrics_evaluator(entry.data_file) == 
ROWS_MUST_MATCH:
+                                
deleted_entries.append(_copy_with_new_status(entry, 
ManifestEntryStatus.DELETED))
+                                self._deleted_data_files.add(entry.data_file)
+                            elif inclusive_metrics_evaluator(entry.data_file) 
== ROWS_CANNOT_MATCH:
+                                
existing_entries.append(_copy_with_new_status(entry, 
ManifestEntryStatus.EXISTING))
+                            else:
+                                # Based on the metadata, it is unsure to say 
if the file can be deleted
+                                partial_rewrites_needed = True
+
+                        if len(deleted_entries) > 0:
+                            total_deleted_entries += deleted_entries
+
+                            # Rewrite the manifest
+                            if len(existing_entries) > 0:
+                                output_file_location = _new_manifest_path(
+                                    
location=self._transaction.table_metadata.location,
+                                    num=next(self._manifest_counter),
+                                    commit_uuid=self.commit_uuid,
+                                )
+                                with write_manifest(
+                                    
format_version=self._transaction.table_metadata.format_version,
+                                    
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
+                                    
schema=self._transaction.table_metadata.schema(),
+                                    
output_file=self._io.new_output(output_file_location),
+                                    snapshot_id=self._snapshot_id,
+                                ) as writer:
+                                    for existing_entry in existing_entries:
+                                        writer.add_entry(existing_entry)
+                                
existing_manifests.append(writer.to_manifest_file())
+                            # else:
+                            # deleted_manifests.append()
+                        else:
+                            existing_manifests.append(manifest_file)
+                else:
+                    existing_manifests.append(manifest_file)
+
+        return existing_manifests, total_deleted_entries, 
partial_rewrites_needed
+
+    def _existing_manifests(self) -> List[ManifestFile]:
+        return self._compute_deletes[0]
+
+    def _deleted_entries(self) -> List[ManifestEntry]:
+        return self._compute_deletes[1]
+
+    @property
+    def rewrites_needed(self) -> bool:
+        """Indicate if data files need to be rewritten."""
+        return self._compute_deletes[2]
+
+    @property
+    def files_affected(self) -> bool:
+        """Indicate if any manifest-entries can be dropped."""
+        return len(self._deleted_entries()) > 0
+
+
+class FastAppendFiles(_MergingSnapshotProducer["FastAppendFiles"]):
     def _existing_manifests(self) -> List[ManifestFile]:
         """To determine if there are any existing manifest files.
 
@@ -3152,14 +3413,53 @@ class FastAppendFiles(_MergingSnapshotProducer):
         return []
 
 
-class OverwriteFiles(_MergingSnapshotProducer):
+class OverwriteFiles(_MergingSnapshotProducer["OverwriteFiles"]):
+    """Overwrites data from the table. This will produce an OVERWRITE snapshot.
+
+    Data and delete files were added and removed in a logical overwrite 
operation.
+    """
+
     def _existing_manifests(self) -> List[ManifestFile]:
-        """To determine if there are any existing manifest files.
+        """Determine if there are any existing manifest files."""
+        existing_files = []
 
-        In the of a full overwrite, all the previous manifests are
-        considered deleted.
-        """
-        return []
+        if snapshot := self._transaction.table_metadata.current_snapshot():
+            for manifest_file in snapshot.manifests(io=self._io):
+                entries = manifest_file.fetch_manifest_entry(io=self._io, 
discard_deleted=True)
+                found_deleted_data_files = [entry.data_file for entry in 
entries if entry.data_file in self._deleted_data_files]
+
+                if len(found_deleted_data_files) == 0:
+                    existing_files.append(manifest_file)
+                else:
+                    # We have to rewrite the
+                    output_file_location = _new_manifest_path(
+                        location=self._transaction.table_metadata.location,
+                        num=next(self._manifest_counter),
+                        commit_uuid=self.commit_uuid,
+                    )
+                    if any(entry.data_file not in found_deleted_data_files for 
entry in entries):
+                        with write_manifest(
+                            
format_version=self._transaction.table_metadata.format_version,
+                            spec=self._transaction.table_metadata.spec(),
+                            schema=self._transaction.table_metadata.schema(),
+                            
output_file=self._io.new_output(output_file_location),
+                            snapshot_id=self._snapshot_id,
+                        ) as writer:
+                            [
+                                writer.add_entry(
+                                    ManifestEntry(
+                                        status=ManifestEntryStatus.EXISTING,
+                                        snapshot_id=entry.snapshot_id,
+                                        
data_sequence_number=entry.data_sequence_number,
+                                        
file_sequence_number=entry.file_sequence_number,
+                                        data_file=entry.data_file,
+                                    )
+                                )
+                                for entry in entries
+                                if entry.data_file not in 
found_deleted_data_files
+                            ]
+                        existing_files.append(writer.to_manifest_file())
+        return existing_files
 
     def _deleted_entries(self) -> List[ManifestEntry]:
         """To determine if we need to record any deleted entries.
@@ -3186,7 +3486,7 @@ class OverwriteFiles(_MergingSnapshotProducer):
                         data_file=entry.data_file,
                     )
                     for entry in manifest.fetch_manifest_entry(self._io, 
discard_deleted=True)
-                    if entry.data_file.content == DataFileContent.DATA
+                    if entry.data_file.content == DataFileContent.DATA and 
entry.data_file in self._deleted_data_files
                 ]
 
             list_of_entries = executor.map(_get_entries, 
previous_snapshot.manifests(self._io))
@@ -3200,7 +3500,7 @@ class UpdateSnapshot:
     _io: FileIO
     _snapshot_properties: Dict[str, str]
 
-    def __init__(self, transaction: Transaction, io: FileIO, 
snapshot_properties: Dict[str, str]) -> None:
+    def __init__(self, transaction: Transaction, io: FileIO, 
snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
         self._transaction = transaction
         self._io = io
         self._snapshot_properties = snapshot_properties
@@ -3210,8 +3510,9 @@ class UpdateSnapshot:
             operation=Operation.APPEND, transaction=self._transaction, 
io=self._io, snapshot_properties=self._snapshot_properties
         )
 
-    def overwrite(self) -> OverwriteFiles:
+    def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> 
OverwriteFiles:
         return OverwriteFiles(
+            commit_uuid=commit_uuid,
             operation=Operation.OVERWRITE
             if self._transaction.table_metadata.current_snapshot() is not None
             else Operation.APPEND,
@@ -3220,6 +3521,14 @@ class UpdateSnapshot:
             snapshot_properties=self._snapshot_properties,
         )
 
+    def delete(self) -> DeleteFiles:
+        return DeleteFiles(
+            operation=Operation.DELETE,
+            transaction=self._transaction,
+            io=self._io,
+            snapshot_properties=self._snapshot_properties,
+        )
+
 
 class UpdateSpec(UpdateTableMetadata["UpdateSpec"]):
     _transaction: Transaction
@@ -4056,7 +4365,7 @@ def _determine_partitions(spec: PartitionSpec, schema: 
Schema, arrow_table: pa.T
     {'year': [2020, 2022, 2022, 2021, 2022, 2022, 2022, 2019, 2021],
      'n_legs': [2, 2, 2, 4, 4, 4, 4, 5, 100],
      'animal': ["Flamingo", "Parrot", "Parrot", "Dog", "Horse", "Horse", 
"Horse","Brittle stars", "Centipede"]}.
-    The algrithm:
+    The algorithm:
     Firstly we group the rows into partitions by sorting with sort order 
[('n_legs', 'descending'), ('year', 'descending')]
     and null_placement of "at_end".
     This gives the same table as raw input.
diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py
index 842d4252..1ccb0799 100644
--- a/pyiceberg/table/snapshots.py
+++ b/pyiceberg/table/snapshots.py
@@ -352,7 +352,7 @@ def _truncate_table_summary(summary: Summary, 
previous_summary: Mapping[str, str
 def update_snapshot_summaries(
     summary: Summary, previous_summary: Optional[Mapping[str, str]] = None, 
truncate_full_table: bool = False
 ) -> Summary:
-    if summary.operation not in {Operation.APPEND, Operation.OVERWRITE}:
+    if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, 
Operation.DELETE}:
         raise ValueError(f"Operation not implemented: {summary.operation}")
 
     if truncate_full_table and summary.operation == Operation.OVERWRITE and 
previous_summary is not None:
diff --git a/tests/catalog/integration_test_glue.py 
b/tests/catalog/integration_test_glue.py
index 21c41521..c69bc86c 100644
--- a/tests/catalog/integration_test_glue.py
+++ b/tests/catalog/integration_test_glue.py
@@ -33,9 +33,8 @@ from pyiceberg.exceptions import (
     NoSuchTableError,
     TableAlreadyExistsError,
 )
-from pyiceberg.io.pyarrow import schema_to_pyarrow
+from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow
 from pyiceberg.schema import Schema
-from pyiceberg.table import _dataframe_to_data_files
 from pyiceberg.types import IntegerType
 from tests.conftest import clean_up, get_bucket_name, get_s3_path
 
diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py
index bb8fcea3..9251d717 100644
--- a/tests/catalog/test_sql.py
+++ b/tests/catalog/test_sql.py
@@ -39,10 +39,9 @@ from pyiceberg.exceptions import (
     TableAlreadyExistsError,
 )
 from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL
-from pyiceberg.io.pyarrow import schema_to_pyarrow
+from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
 from pyiceberg.schema import Schema
-from pyiceberg.table import _dataframe_to_data_files
 from pyiceberg.table.snapshots import Operation
 from pyiceberg.table.sorting import (
     NullOrder,
diff --git a/tests/conftest.py b/tests/conftest.py
index d200f3ab..95e1128a 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -2299,7 +2299,37 @@ def arrow_table_with_null(pa_schema: "pa.Schema") -> 
"pa.Table":
     """Pyarrow table with all kinds of columns."""
     import pyarrow as pa
 
-    return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)
+    return pa.Table.from_pydict(
+        {
+            "bool": [False, None, True],
+            "string": ["a", None, "z"],
+            # Go over the 16 bytes to kick in truncation
+            "string_long": ["a" * 22, None, "z" * 22],
+            "int": [1, None, 9],
+            "long": [1, None, 9],
+            "float": [0.0, None, 0.9],
+            "double": [0.0, None, 0.9],
+            # 'time': [1_000_000, None, 3_000_000],  # Example times: 1s, 
none, and 3s past midnight #Spark does not support time fields
+            "timestamp": [datetime(2023, 1, 1, 19, 25, 00), None, 
datetime(2023, 3, 1, 19, 25, 00)],
+            "timestamptz": [
+                datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc),
+                None,
+                datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc),
+            ],
+            "date": [date(2023, 1, 1), None, date(2023, 3, 1)],
+            # Not supported by Spark
+            # 'time': [time(1, 22, 0), None, time(19, 25, 0)],
+            # Not natively supported by Arrow
+            # 'uuid': 
[uuid.UUID('00000000-0000-0000-0000-000000000000').bytes, None, 
uuid.UUID('11111111-1111-1111-1111-111111111111').bytes],
+            "binary": [b"\01", None, b"\22"],
+            "fixed": [
+                uuid.UUID("00000000-0000-0000-0000-000000000000").bytes,
+                None,
+                uuid.UUID("11111111-1111-1111-1111-111111111111").bytes,
+            ],
+        },
+        schema=pa_schema,
+    )
 
 
 @pytest.fixture(scope="session")
diff --git a/tests/integration/test_deletes.py 
b/tests/integration/test_deletes.py
new file mode 100644
index 00000000..ad3adede
--- /dev/null
+++ b/tests/integration/test_deletes.py
@@ -0,0 +1,419 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+from typing import List
+
+import pyarrow as pa
+import pytest
+from pyspark.sql import SparkSession
+
+from pyiceberg.catalog.rest import RestCatalog
+from pyiceberg.exceptions import NoSuchTableError
+from pyiceberg.expressions import AlwaysTrue, EqualTo
+from pyiceberg.manifest import ManifestEntryStatus
+from pyiceberg.schema import Schema
+from pyiceberg.table.snapshots import Operation, Summary
+from pyiceberg.types import IntegerType, NestedField
+
+
+def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None:
+    for sql in sqls:
+        spark.sql(sql)
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_partitioned_table_delete_full_file(spark: SparkSession, 
session_catalog: RestCatalog, format_version: int) -> None:
+    identifier = "default.table_partitioned_delete"
+
+    run_spark_commands(
+        spark,
+        [
+            f"DROP TABLE IF EXISTS {identifier}",
+            f"""
+            CREATE TABLE {identifier} (
+                number_partitioned  int,
+                number              int
+            )
+            USING iceberg
+            PARTITIONED BY (number_partitioned)
+            TBLPROPERTIES('format-version' = {format_version})
+        """,
+            f"""
+            INSERT INTO {identifier} VALUES (10, 20), (10, 30)
+        """,
+            f"""
+            INSERT INTO {identifier} VALUES (11, 20), (11, 30)
+        """,
+        ],
+    )
+
+    tbl = session_catalog.load_table(identifier)
+    tbl.delete(EqualTo("number_partitioned", 10))
+
+    # No overwrite operation
+    assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] 
== ["append", "append", "delete"]
+    assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [11, 
11], "number": [20, 30]}
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_partitioned_table_rewrite(spark: SparkSession, session_catalog: 
RestCatalog, format_version: int) -> None:
+    identifier = "default.table_partitioned_delete"
+
+    run_spark_commands(
+        spark,
+        [
+            f"DROP TABLE IF EXISTS {identifier}",
+            f"""
+            CREATE TABLE {identifier} (
+                number_partitioned  int,
+                number              int
+            )
+            USING iceberg
+            PARTITIONED BY (number_partitioned)
+            TBLPROPERTIES('format-version' = {format_version})
+        """,
+            f"""
+            INSERT INTO {identifier} VALUES (10, 20), (10, 30)
+        """,
+            f"""
+            INSERT INTO {identifier} VALUES (11, 20), (11, 30)
+        """,
+        ],
+    )
+
+    tbl = session_catalog.load_table(identifier)
+    tbl.delete(EqualTo("number", 20))
+
+    # We don't delete a whole partition, so there is only a overwrite
+    assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] 
== ["append", "append", "overwrite"]
+    assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [11, 
10], "number": [30, 30]}
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_partitioned_table_no_match(spark: SparkSession, session_catalog: 
RestCatalog, format_version: int) -> None:
+    identifier = "default.table_partitioned_delete"
+
+    run_spark_commands(
+        spark,
+        [
+            f"DROP TABLE IF EXISTS {identifier}",
+            f"""
+            CREATE TABLE {identifier} (
+                number_partitioned  int,
+                number              int
+            )
+            USING iceberg
+            PARTITIONED BY (number_partitioned)
+            TBLPROPERTIES('format-version' = {format_version})
+        """,
+            f"""
+            INSERT INTO {identifier} VALUES (10, 20), (10, 30)
+        """,
+        ],
+    )
+
+    tbl = session_catalog.load_table(identifier)
+    tbl.delete(EqualTo("number_partitioned", 22))  # Does not affect any data
+
+    assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] 
== ["append"]
+    assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10, 
10], "number": [20, 30]}
+
+
[email protected]
+def test_delete_partitioned_table_positional_deletes(spark: SparkSession, 
session_catalog: RestCatalog) -> None:
+    identifier = "default.table_partitioned_delete"
+
+    run_spark_commands(
+        spark,
+        [
+            f"DROP TABLE IF EXISTS {identifier}",
+            f"""
+            CREATE TABLE {identifier} (
+                number_partitioned  int,
+                number              int
+            )
+            USING iceberg
+            PARTITIONED BY (number_partitioned)
+            TBLPROPERTIES(
+                'format-version' = 2,
+                'write.delete.mode'='merge-on-read',
+                'write.update.mode'='merge-on-read',
+                'write.merge.mode'='merge-on-read'
+            )
+        """,
+            f"""
+            INSERT INTO {identifier} VALUES (10, 20), (10, 30), (10, 40)
+        """,
+            # Generate a positional delete
+            f"""
+            DELETE FROM {identifier} WHERE number = 30
+        """,
+        ],
+    )
+
+    tbl = session_catalog.load_table(identifier)
+
+    # Assert that there is just a single Parquet file, that has one merge on 
read file
+    files = list(tbl.scan().plan_files())
+    assert len(files) == 1
+    assert len(files[0].delete_files) == 1
+
+    # Will rewrite a data file without the positional delete
+    tbl.delete(EqualTo("number", 40))
+
+    # One positional delete has been added, but an OVERWRITE status is set
+    # https://github.com/apache/iceberg/issues/10122
+    assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] 
== ["append", "overwrite", "overwrite"]
+    assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10], 
"number": [20]}
+
+
[email protected]
+def test_overwrite_partitioned_table(spark: SparkSession, session_catalog: 
RestCatalog) -> None:
+    identifier = "default.table_partitioned_delete"
+
+    run_spark_commands(
+        spark,
+        [
+            f"DROP TABLE IF EXISTS {identifier}",
+            f"""
+            CREATE TABLE {identifier} (
+                number_partitioned  int,
+                number              int
+            )
+            USING iceberg
+            PARTITIONED BY (number_partitioned)
+            TBLPROPERTIES(
+                'format-version' = 2,
+                'write.delete.mode'='merge-on-read',
+                'write.update.mode'='merge-on-read',
+                'write.merge.mode'='merge-on-read'
+            )
+        """,
+            f"""
+            INSERT INTO {identifier} VALUES (10, 1), (10, 2), (20, 3)
+        """,
+        ],
+    )
+
+    tbl = session_catalog.load_table(identifier)
+
+    files = list(tbl.scan().plan_files())
+    assert len(files) == 2
+
+    arrow_schema = pa.schema([pa.field("number_partitioned", pa.int32()), 
pa.field("number", pa.int32())])
+    arrow_tbl = pa.Table.from_pylist(
+        [
+            {"number_partitioned": 10, "number": 4},
+            {"number_partitioned": 10, "number": 5},
+        ],
+        schema=arrow_schema,
+    )
+
+    # Will rewrite a data file without the positional delete
+    tbl.overwrite(arrow_tbl, "number_partitioned == 10")
+
+    # One positional delete has been added, but an OVERWRITE status is set
+    # https://github.com/apache/iceberg/issues/10122
+    assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] 
== ["append", "delete", "append"]
+    assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10, 
10, 20], "number": [4, 5, 3]}
+
+
[email protected]
+def test_partitioned_table_positional_deletes_sequence_number(spark: 
SparkSession, session_catalog: RestCatalog) -> None:
+    identifier = "default.table_partitioned_delete_sequence_number"
+
+    # This test case is a bit more complex. Here we run a MoR delete on a 
file, we make sure that
+    # the manifest gets rewritten (but not the data file with a MoR), and 
check if the delete is still there
+    # to assure that the sequence numbers are maintained
+
+    run_spark_commands(
+        spark,
+        [
+            f"DROP TABLE IF EXISTS {identifier}",
+            f"""
+            CREATE TABLE {identifier} (
+                number_partitioned  int,
+                number              int
+            )
+            USING iceberg
+            PARTITIONED BY (number_partitioned)
+            TBLPROPERTIES(
+                'format-version' = 2,
+                'write.delete.mode'='merge-on-read',
+                'write.update.mode'='merge-on-read',
+                'write.merge.mode'='merge-on-read'
+            )
+        """,
+            f"""
+            INSERT INTO {identifier} VALUES (10, 100), (10, 101), (20, 200), 
(20, 201), (20, 202)
+        """,
+            # Generate a positional delete
+            f"""
+            DELETE FROM {identifier} WHERE number = 101
+        """,
+        ],
+    )
+
+    tbl = session_catalog.load_table(identifier)
+
+    files = list(tbl.scan().plan_files())
+    assert len(files) == 2
+
+    # Will rewrite a data file without a positional delete
+    tbl.delete(EqualTo("number", 201))
+
+    # One positional delete has been added, but an OVERWRITE status is set
+    # https://github.com/apache/iceberg/issues/10122
+    snapshots = tbl.snapshots()
+    assert len(snapshots) == 3
+
+    # Snapshots produced by Spark
+    assert [snapshot.summary.operation.value for snapshot in 
tbl.snapshots()[0:2]] == ["append", "overwrite"]
+
+    # Will rewrite one parquet file
+    assert snapshots[2].summary == Summary(
+        Operation.OVERWRITE,
+        **{
+            "added-files-size": "1145",
+            "added-data-files": "1",
+            "added-records": "2",
+            "changed-partition-count": "1",
+            "total-files-size": snapshots[2].summary["total-files-size"],
+            "total-delete-files": "0",
+            "total-data-files": "1",
+            "total-position-deletes": "0",
+            "total-records": "2",
+            "total-equality-deletes": "0",
+            "deleted-data-files": "2",
+            "removed-delete-files": "1",
+            "deleted-records": "5",
+            "removed-files-size": snapshots[2].summary["removed-files-size"],
+            "removed-position-deletes": "1",
+        },
+    )
+
+    assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [20, 
20, 10], "number": [200, 202, 100]}
+
+
[email protected]
+def test_delete_no_match(session_catalog: RestCatalog) -> None:
+    arrow_schema = pa.schema([pa.field("ints", pa.int32())])
+    arrow_tbl = pa.Table.from_pylist(
+        [
+            {"ints": 1},
+            {"ints": 3},
+        ],
+        schema=arrow_schema,
+    )
+
+    iceberg_schema = Schema(NestedField(1, "ints", IntegerType()))
+
+    tbl_identifier = "default.test_delete_no_match"
+
+    try:
+        session_catalog.drop_table(tbl_identifier)
+    except NoSuchTableError:
+        pass
+
+    tbl = session_catalog.create_table(tbl_identifier, iceberg_schema)
+    tbl.append(arrow_tbl)
+
+    assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == 
[Operation.APPEND]
+
+    tbl.delete("ints == 2")  # Only 1 and 3 in the file, but is between the 
lower and upper bound
+
+    assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == 
[Operation.APPEND]
+
+
[email protected]
+def test_delete_overwrite(session_catalog: RestCatalog) -> None:
+    arrow_schema = pa.schema([pa.field("ints", pa.int32())])
+    arrow_tbl = pa.Table.from_pylist(
+        [
+            {"ints": 1},
+            {"ints": 2},
+        ],
+        schema=arrow_schema,
+    )
+
+    iceberg_schema = Schema(NestedField(1, "ints", IntegerType()))
+
+    tbl_identifier = "default.test_delete_overwrite"
+
+    try:
+        session_catalog.drop_table(tbl_identifier)
+    except NoSuchTableError:
+        pass
+
+    tbl = session_catalog.create_table(tbl_identifier, iceberg_schema)
+    tbl.append(arrow_tbl)
+
+    assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == 
[Operation.APPEND]
+
+    arrow_tbl_overwrite = pa.Table.from_pylist(
+        [
+            {"ints": 3},
+            {"ints": 4},
+        ],
+        schema=arrow_schema,
+    )
+    tbl.overwrite(arrow_tbl_overwrite, "ints == 2")  # Should rewrite one file
+
+    assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [
+        Operation.APPEND,
+        Operation.OVERWRITE,
+        Operation.APPEND,
+    ]
+
+    assert tbl.scan().to_arrow()["ints"].to_pylist() == [3, 4, 1]
+
+
[email protected]
+def test_delete_truncate(session_catalog: RestCatalog) -> None:
+    arrow_schema = pa.schema([pa.field("ints", pa.int32())])
+    arrow_tbl = pa.Table.from_pylist(
+        [
+            {"ints": 1},
+        ],
+        schema=arrow_schema,
+    )
+
+    iceberg_schema = Schema(NestedField(1, "ints", IntegerType()))
+
+    tbl_identifier = "default.test_delete_overwrite"
+
+    try:
+        session_catalog.drop_table(tbl_identifier)
+    except NoSuchTableError:
+        pass
+
+    tbl = session_catalog.create_table(tbl_identifier, iceberg_schema)
+    tbl.append(arrow_tbl)
+
+    # Effectively a truncate
+    tbl.delete(delete_filter=AlwaysTrue())
+
+    manifests = tbl.current_snapshot().manifests(tbl.io)
+    assert len(manifests) == 1
+
+    entries = manifests[0].fetch_manifest_entry(tbl.io, discard_deleted=False)
+    assert len(entries) == 1
+
+    assert entries[0].status == ManifestEntryStatus.DELETED
diff --git a/tests/integration/test_inspect_table.py 
b/tests/integration/test_inspect_table.py
index 834fe83d..d8a83e0d 100644
--- a/tests/integration/test_inspect_table.py
+++ b/tests/integration/test_inspect_table.py
@@ -103,13 +103,14 @@ def test_inspect_snapshots(
         assert isinstance(snapshot_id.as_py(), int)
 
     assert df["parent_id"][0].as_py() is None
-    assert df["parent_id"][1:] == df["snapshot_id"][:2]
+    assert df["parent_id"][1:].to_pylist() == 
df["snapshot_id"][:-1].to_pylist()
 
-    assert [operation.as_py() for operation in df["operation"]] == ["append", 
"overwrite", "append"]
+    assert [operation.as_py() for operation in df["operation"]] == ["append", 
"delete", "append", "append"]
 
     for manifest_list in df["manifest_list"]:
         assert manifest_list.as_py().startswith("s3://")
 
+    # Append
     assert df["summary"][0].as_py() == [
         ("added-files-size", "5459"),
         ("added-data-files", "1"),
@@ -122,6 +123,19 @@ def test_inspect_snapshots(
         ("total-equality-deletes", "0"),
     ]
 
+    # Delete
+    assert df["summary"][1].as_py() == [
+        ("removed-files-size", "5459"),
+        ("deleted-data-files", "1"),
+        ("deleted-records", "3"),
+        ("total-data-files", "0"),
+        ("total-delete-files", "0"),
+        ("total-records", "0"),
+        ("total-files-size", "0"),
+        ("total-position-deletes", "0"),
+        ("total-equality-deletes", "0"),
+    ]
+
     lhs = spark.table(f"{identifier}.snapshots").toPandas()
     rhs = df.to_pandas()
     for column in df.column_names:
diff --git a/tests/integration/test_rest_schema.py 
b/tests/integration/test_rest_schema.py
index f4ab98a8..644cb805 100644
--- a/tests/integration/test_rest_schema.py
+++ b/tests/integration/test_rest_schema.py
@@ -2512,14 +2512,18 @@ def 
test_two_add_schemas_in_a_single_transaction(catalog: Catalog) -> None:
         ),
     )
 
-    with pytest.raises(CommitFailedException) as exc_info:
-        with tbl.transaction() as tr:
-            with tr.update_schema() as update:
-                update.add_column("bar", field_type=StringType())
-            with tr.update_schema() as update:
-                update.add_column("baz", field_type=StringType())
+    with tbl.transaction() as tr:
+        with tr.update_schema() as update:
+            update.add_column("bar", field_type=StringType())
+        with tr.update_schema() as update:
+            update.add_column("baz", field_type=StringType())
 
-    assert "CommitFailedException: Requirement failed: current schema changed: 
expected id 1 != 0" in str(exc_info.value)
+    assert tbl.schema().schema_id == 2
+    assert tbl.schema() == Schema(
+        NestedField(field_id=1, name="foo", field_type=StringType()),
+        NestedField(field_id=2, name="bar", field_type=StringType()),
+        NestedField(field_id=3, name="baz", field_type=StringType()),
+    )
 
 
 @pytest.mark.integration
diff --git a/tests/integration/test_writes/test_partitioned_writes.py 
b/tests/integration/test_writes/test_partitioned_writes.py
index f6e6e93c..59bb7693 100644
--- a/tests/integration/test_writes/test_partitioned_writes.py
+++ b/tests/integration/test_writes/test_partitioned_writes.py
@@ -38,7 +38,6 @@ from pyiceberg.transforms import (
     TruncateTransform,
     YearTransform,
 )
-from tests.conftest import TEST_DATA_WITH_NULL
 from utils import TABLE_SCHEMA, _create_table
 
 
@@ -70,7 +69,7 @@ def test_query_filter_null_partitioned(
     assert tbl.format_version == format_version, f"Expected v{format_version}, 
got: v{tbl.format_version}"
     df = spark.table(identifier)
     assert df.count() == 3, f"Expected 3 total rows for {identifier}"
-    for col in TEST_DATA_WITH_NULL.keys():
+    for col in arrow_table_with_null.column_names:
         assert df.where(f"{col} is not null").count() == 2, f"Expected 2 
non-null rows for {col}"
         assert df.where(f"{col} is null").count() == 1, f"Expected 1 null row 
for {col} is null"
 
@@ -81,7 +80,12 @@ def test_query_filter_null_partitioned(
 )
 @pytest.mark.parametrize("format_version", [1, 2])
 def test_query_filter_without_data_partitioned(
-    session_catalog: Catalog, spark: SparkSession, arrow_table_without_data: 
pa.Table, part_col: str, format_version: int
+    session_catalog: Catalog,
+    spark: SparkSession,
+    arrow_table_without_data: pa.Table,
+    part_col: str,
+    arrow_table_with_null: pa.Table,
+    format_version: int,
 ) -> None:
     # Given
     identifier = 
f"default.arrow_table_v{format_version}_without_data_partitioned_on_col_{part_col}"
@@ -102,7 +106,7 @@ def test_query_filter_without_data_partitioned(
     # Then
     assert tbl.format_version == format_version, f"Expected v{format_version}, 
got: v{tbl.format_version}"
     df = spark.table(identifier)
-    for col in TEST_DATA_WITH_NULL.keys():
+    for col in arrow_table_with_null.column_names:
         assert df.where(f"{col} is null").count() == 0, f"Expected 0 row for 
{col}"
         assert df.where(f"{col} is not null").count() == 0, f"Expected 0 row 
for {col}"
 
@@ -134,7 +138,7 @@ def test_query_filter_only_nulls_partitioned(
     # Then
     assert tbl.format_version == format_version, f"Expected v{format_version}, 
got: v{tbl.format_version}"
     df = spark.table(identifier)
-    for col in TEST_DATA_WITH_NULL.keys():
+    for col in arrow_table_with_only_nulls.column_names:
         assert df.where(f"{col} is null").count() == 2, f"Expected 2 row for 
{col}"
         assert df.where(f"{col} is not null").count() == 0, f"Expected 0 rows 
for {col}"
 
@@ -169,7 +173,7 @@ def test_query_filter_appended_null_partitioned(
     # Then
     assert tbl.format_version == format_version, f"Expected v{format_version}, 
got: v{tbl.format_version}"
     df = spark.table(identifier)
-    for col in TEST_DATA_WITH_NULL.keys():
+    for col in arrow_table_with_null.column_names:
         assert df.where(f"{col} is not null").count() == 6, f"Expected 6 
non-null rows for {col}"
         assert df.where(f"{col} is null").count() == 3, f"Expected 3 null rows 
for {col}"
     # expecting 6 files: first append with [A], [B], [C],  second append with 
[A, A], [B, B], [C, C]
@@ -212,7 +216,7 @@ def test_query_filter_v1_v2_append_null(
 
     # Then
     assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
-    for col in TEST_DATA_WITH_NULL.keys():  # type: ignore
+    for col in arrow_table_with_null.column_names:  # type: ignore
         df = spark.table(identifier)
         assert df.where(f"{col} is not null").count() == 4, f"Expected 4 
non-null rows for {col}"
         assert df.where(f"{col} is null").count() == 2, f"Expected 2 null rows 
for {col}"
@@ -422,7 +426,7 @@ def test_append_ymd_transform_partitioned(
     assert tbl.format_version == format_version, f"Expected v{format_version}, 
got: v{tbl.format_version}"
     df = spark.table(identifier)
     assert df.count() == 3, f"Expected 3 total rows for {identifier}"
-    for col in TEST_DATA_WITH_NULL.keys():
+    for col in arrow_table_with_null.column_names:
         assert df.where(f"{col} is not null").count() == 2, f"Expected 2 
non-null rows for {col}"
         assert df.where(f"{col} is null").count() == 1, f"Expected 1 null row 
for {col} is null"
 
diff --git a/tests/integration/test_writes/test_writes.py 
b/tests/integration/test_writes/test_writes.py
index 4585406c..9a541bc8 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -37,12 +37,12 @@ from pyiceberg.catalog.hive import HiveCatalog
 from pyiceberg.catalog.rest import RestCatalog
 from pyiceberg.catalog.sql import SqlCatalog
 from pyiceberg.exceptions import NoSuchTableError
+from pyiceberg.io.pyarrow import _dataframe_to_data_files
 from pyiceberg.partitioning import PartitionField, PartitionSpec
 from pyiceberg.schema import Schema
-from pyiceberg.table import TableProperties, _dataframe_to_data_files
+from pyiceberg.table import TableProperties
 from pyiceberg.transforms import IdentityTransform
 from pyiceberg.types import IntegerType, NestedField
-from tests.conftest import TEST_DATA_WITH_NULL
 from utils import _create_table
 
 
@@ -124,52 +124,55 @@ def test_query_count(spark: SparkSession, format_version: 
int) -> None:
 
 
 @pytest.mark.integration
[email protected]("col", TEST_DATA_WITH_NULL.keys())
 @pytest.mark.parametrize("format_version", [1, 2])
-def test_query_filter_null(spark: SparkSession, col: str, format_version: int) 
-> None:
+def test_query_filter_null(spark: SparkSession, arrow_table_with_null: 
pa.Table, format_version: int) -> None:
     identifier = f"default.arrow_table_v{format_version}_with_null"
     df = spark.table(identifier)
-    assert df.where(f"{col} is null").count() == 1, f"Expected 1 row for {col}"
-    assert df.where(f"{col} is not null").count() == 2, f"Expected 2 rows for 
{col}"
+    for col in arrow_table_with_null.column_names:
+        assert df.where(f"{col} is null").count() == 1, f"Expected 1 row for 
{col}"
+        assert df.where(f"{col} is not null").count() == 2, f"Expected 2 rows 
for {col}"
 
 
 @pytest.mark.integration
[email protected]("col", TEST_DATA_WITH_NULL.keys())
 @pytest.mark.parametrize("format_version", [1, 2])
-def test_query_filter_without_data(spark: SparkSession, col: str, 
format_version: int) -> None:
+def test_query_filter_without_data(spark: SparkSession, arrow_table_with_null: 
pa.Table, format_version: int) -> None:
     identifier = f"default.arrow_table_v{format_version}_without_data"
     df = spark.table(identifier)
-    assert df.where(f"{col} is null").count() == 0, f"Expected 0 row for {col}"
-    assert df.where(f"{col} is not null").count() == 0, f"Expected 0 row for 
{col}"
+    for col in arrow_table_with_null.column_names:
+        assert df.where(f"{col} is null").count() == 0, f"Expected 0 row for 
{col}"
+        assert df.where(f"{col} is not null").count() == 0, f"Expected 0 row 
for {col}"
 
 
 @pytest.mark.integration
[email protected]("col", TEST_DATA_WITH_NULL.keys())
 @pytest.mark.parametrize("format_version", [1, 2])
-def test_query_filter_only_nulls(spark: SparkSession, col: str, 
format_version: int) -> None:
+def test_query_filter_only_nulls(spark: SparkSession, arrow_table_with_null: 
pa.Table, format_version: int) -> None:
     identifier = f"default.arrow_table_v{format_version}_with_only_nulls"
     df = spark.table(identifier)
-    assert df.where(f"{col} is null").count() == 2, f"Expected 2 rows for 
{col}"
-    assert df.where(f"{col} is not null").count() == 0, f"Expected 0 row for 
{col}"
+    for col in arrow_table_with_null.column_names:
+        assert df.where(f"{col} is null").count() == 2, f"Expected 2 rows for 
{col}"
+        assert df.where(f"{col} is not null").count() == 0, f"Expected 0 row 
for {col}"
 
 
 @pytest.mark.integration
[email protected]("col", TEST_DATA_WITH_NULL.keys())
 @pytest.mark.parametrize("format_version", [1, 2])
-def test_query_filter_appended_null(spark: SparkSession, col: str, 
format_version: int) -> None:
+def test_query_filter_appended_null(spark: SparkSession, 
arrow_table_with_null: pa.Table, format_version: int) -> None:
     identifier = f"default.arrow_table_v{format_version}_appended_with_null"
     df = spark.table(identifier)
-    assert df.where(f"{col} is null").count() == 2, f"Expected 1 row for {col}"
-    assert df.where(f"{col} is not null").count() == 4, f"Expected 2 rows for 
{col}"
+    for col in arrow_table_with_null.column_names:
+        assert df.where(f"{col} is null").count() == 2, f"Expected 1 row for 
{col}"
+        assert df.where(f"{col} is not null").count() == 4, f"Expected 2 rows 
for {col}"
 
 
 @pytest.mark.integration
[email protected]("col", TEST_DATA_WITH_NULL.keys())
-def test_query_filter_v1_v2_append_null(spark: SparkSession, col: str) -> None:
+def test_query_filter_v1_v2_append_null(
+    spark: SparkSession,
+    arrow_table_with_null: pa.Table,
+) -> None:
     identifier = "default.arrow_table_v1_v2_appended_with_null"
     df = spark.table(identifier)
-    assert df.where(f"{col} is null").count() == 2, f"Expected 1 row for {col}"
-    assert df.where(f"{col} is not null").count() == 4, f"Expected 2 rows for 
{col}"
+    for col in arrow_table_with_null.column_names:
+        assert df.where(f"{col} is null").count() == 2, f"Expected 1 row for 
{col}"
+        assert df.where(f"{col} is not null").count() == 4, f"Expected 2 rows 
for {col}"
 
 
 @pytest.mark.integration
@@ -187,10 +190,11 @@ def test_summaries(spark: SparkSession, session_catalog: 
Catalog, arrow_table_wi
     ).collect()
 
     operations = [row.operation for row in rows]
-    assert operations == ["append", "append", "overwrite"]
+    assert operations == ["append", "append", "delete", "append"]
 
     summaries = [row.summary for row in rows]
 
+    # Append
     assert summaries[0] == {
         "added-data-files": "1",
         "added-files-size": "5459",
@@ -203,6 +207,7 @@ def test_summaries(spark: SparkSession, session_catalog: 
Catalog, arrow_table_wi
         "total-records": "3",
     }
 
+    # Append
     assert summaries[1] == {
         "added-data-files": "1",
         "added-files-size": "5459",
@@ -215,13 +220,24 @@ def test_summaries(spark: SparkSession, session_catalog: 
Catalog, arrow_table_wi
         "total-records": "6",
     }
 
+    # Delete
     assert summaries[2] == {
-        "added-data-files": "1",
-        "added-files-size": "5459",
-        "added-records": "3",
         "deleted-data-files": "2",
         "deleted-records": "6",
         "removed-files-size": "10918",
+        "total-data-files": "0",
+        "total-delete-files": "0",
+        "total-equality-deletes": "0",
+        "total-files-size": "0",
+        "total-position-deletes": "0",
+        "total-records": "0",
+    }
+
+    # Overwrite
+    assert summaries[3] == {
+        "added-data-files": "1",
+        "added-files-size": "5459",
+        "added-records": "3",
         "total-data-files": "1",
         "total-delete-files": "0",
         "total-equality-deletes": "0",
@@ -249,9 +265,9 @@ def test_data_files(spark: SparkSession, session_catalog: 
Catalog, arrow_table_w
     """
     ).collect()
 
-    assert [row.added_data_files_count for row in rows] == [1, 1, 0, 1, 1]
+    assert [row.added_data_files_count for row in rows] == [1, 0, 1, 1, 1]
     assert [row.existing_data_files_count for row in rows] == [0, 0, 0, 0, 0]
-    assert [row.deleted_data_files_count for row in rows] == [0, 0, 1, 0, 0]
+    assert [row.deleted_data_files_count for row in rows] == [0, 1, 0, 0, 0]
 
 
 @pytest.mark.integration
@@ -556,7 +572,7 @@ def test_summaries_with_only_nulls(
     ).collect()
 
     operations = [row.operation for row in rows]
-    assert operations == ["append", "append", "overwrite"]
+    assert operations == ["append", "append", "delete", "append"]
 
     summaries = [row.summary for row in rows]
 
@@ -582,14 +598,23 @@ def test_summaries_with_only_nulls(
     }
 
     assert summaries[2] == {
+        "deleted-data-files": "1",
+        "deleted-records": "2",
         "removed-files-size": "4239",
+        "total-data-files": "0",
+        "total-delete-files": "0",
         "total-equality-deletes": "0",
+        "total-files-size": "0",
         "total-position-deletes": "0",
-        "deleted-data-files": "1",
+        "total-records": "0",
+    }
+
+    assert summaries[3] == {
+        "total-data-files": "0",
         "total-delete-files": "0",
+        "total-equality-deletes": "0",
         "total-files-size": "0",
-        "deleted-records": "2",
-        "total-data-files": "0",
+        "total-position-deletes": "0",
         "total-records": "0",
     }
 
@@ -812,13 +837,14 @@ def test_inspect_snapshots(
         assert isinstance(snapshot_id.as_py(), int)
 
     assert df["parent_id"][0].as_py() is None
-    assert df["parent_id"][1:] == df["snapshot_id"][:2]
+    assert df["parent_id"][1:].to_pylist() == 
df["snapshot_id"][:-1].to_pylist()
 
-    assert [operation.as_py() for operation in df["operation"]] == ["append", 
"overwrite", "append"]
+    assert [operation.as_py() for operation in df["operation"]] == ["append", 
"delete", "append", "append"]
 
     for manifest_list in df["manifest_list"]:
         assert manifest_list.as_py().startswith("s3://")
 
+    # Append
     assert df["summary"][0].as_py() == [
         ("added-files-size", "5459"),
         ("added-data-files", "1"),
@@ -831,6 +857,19 @@ def test_inspect_snapshots(
         ("total-equality-deletes", "0"),
     ]
 
+    # Delete
+    assert df["summary"][1].as_py() == [
+        ("removed-files-size", "5459"),
+        ("deleted-data-files", "1"),
+        ("deleted-records", "3"),
+        ("total-data-files", "0"),
+        ("total-delete-files", "0"),
+        ("total-records", "0"),
+        ("total-files-size", "0"),
+        ("total-position-deletes", "0"),
+        ("total-equality-deletes", "0"),
+    ]
+
     lhs = spark.table(f"{identifier}.snapshots").toPandas()
     rhs = df.to_pandas()
     for column in df.column_names:
diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py
index fa346405..ff9d92ce 100644
--- a/tests/table/test_snapshots.py
+++ b/tests/table/test_snapshots.py
@@ -314,10 +314,6 @@ def test_invalid_operation() -> None:
         update_snapshot_summaries(summary=Summary(Operation.REPLACE))
     assert "Operation not implemented: Operation.REPLACE" in str(e.value)
 
-    with pytest.raises(ValueError) as e:
-        update_snapshot_summaries(summary=Summary(Operation.DELETE))
-    assert "Operation not implemented: Operation.DELETE" in str(e.value)
-
 
 def test_invalid_type() -> None:
     with pytest.raises(ValueError) as e:

Reply via email to