This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new d587aab1 Add concurrency safety validation checks (#3049)
d587aab1 is described below

commit d587aab12bcd08c9f11708b7187f2d151c81ac94
Author: Gabriel Igliozzi <[email protected]>
AuthorDate: Fri Feb 20 20:46:50 2026 +0100

    Add concurrency safety validation checks (#3049)
    
    <!--
    Thanks for opening a pull request!
    -->
    
    <!-- In the case this PR will resolve an issue, please replace
    ${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
    Closes #1930
    Closes #1931
    
    # Rationale for this change
    
    Now that we have an implementation of DeleteFileIndex we can move ahead
    and implement the last two safety concurrency validations mentioned in
    this #819
    
    ## Are these changes tested?
    
    Yes
    
    ## Are there any user-facing changes?
    
    No
    
    <!-- In the case of user-facing changes, please add the changelog label.
    -->
---
 pyiceberg/table/delete_file_index.py |  15 ++++
 pyiceberg/table/update/validate.py   | 122 ++++++++++++++++++++++++++-
 tests/table/test_validate.py         | 155 ++++++++++++++++++++++++++++++++++-
 3 files changed, 290 insertions(+), 2 deletions(-)

diff --git a/pyiceberg/table/delete_file_index.py 
b/pyiceberg/table/delete_file_index.py
index 5c4323ea..3f513aab 100644
--- a/pyiceberg/table/delete_file_index.py
+++ b/pyiceberg/table/delete_file_index.py
@@ -54,6 +54,10 @@ class PositionDeletes:
         start_idx = bisect_left(self._seqs, seq)
         return [delete_file for delete_file, _ in self._files[start_idx:]]
 
+    def referenced_delete_files(self) -> list[DataFile]:
+        self._ensure_indexed()
+        return [data_file for data_file, _ in self._files]
+
 
 def _has_path_bounds(delete_file: DataFile) -> bool:
     lower = delete_file.lower_bounds
@@ -140,3 +144,14 @@ class DeleteFileIndex:
             deletes.update(path_deletes.filter_by_seq(seq_num))
 
         return deletes
+
+    def referenced_delete_files(self) -> list[DataFile]:
+        data_files: list[DataFile] = []
+
+        for deletes in self._by_partition.values():
+            data_files.extend(deletes.referenced_delete_files())
+
+        for deletes in self._by_path.values():
+            data_files.extend(deletes.referenced_delete_files())
+
+        return data_files
diff --git a/pyiceberg/table/update/validate.py 
b/pyiceberg/table/update/validate.py
index 0cda688f..8178ed6e 100644
--- a/pyiceberg/table/update/validate.py
+++ b/pyiceberg/table/update/validate.py
@@ -19,14 +19,23 @@ from collections.abc import Iterator
 from pyiceberg.exceptions import ValidationException
 from pyiceberg.expressions import BooleanExpression
 from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, 
_InclusiveMetricsEvaluator
-from pyiceberg.manifest import ManifestContent, ManifestEntry, 
ManifestEntryStatus, ManifestFile
+from pyiceberg.manifest import (
+    INITIAL_SEQUENCE_NUMBER,
+    DataFile,
+    ManifestContent,
+    ManifestEntry,
+    ManifestEntryStatus,
+    ManifestFile,
+)
 from pyiceberg.schema import Schema
 from pyiceberg.table import Table
+from pyiceberg.table.delete_file_index import DeleteFileIndex
 from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
 from pyiceberg.typedef import Record
 
 VALIDATE_DATA_FILES_EXIST_OPERATIONS: set[Operation] = {Operation.OVERWRITE, 
Operation.REPLACE, Operation.DELETE}
 VALIDATE_ADDED_DATA_FILES_OPERATIONS: set[Operation] = {Operation.APPEND, 
Operation.OVERWRITE}
+VALIDATE_ADDED_DELETE_FILES_OPERATIONS: set[Operation] = {Operation.DELETE, 
Operation.OVERWRITE}
 
 
 def _validation_history(
@@ -216,6 +225,60 @@ def _added_data_files(
                 yield entry
 
 
+def _added_delete_files(
+    table: Table,
+    starting_snapshot: Snapshot,
+    data_filter: BooleanExpression | None,
+    partition_set: dict[int, set[Record]] | None,
+    parent_snapshot: Snapshot | None,
+) -> DeleteFileIndex:
+    """Return matching delete files that have been added to the table since a 
starting snapshot.
+
+    Args:
+        table: Table to get the history from
+        starting_snapshot: Starting snapshot to get the history from
+        data_filter: Optional filter to match data files
+        partition_set: Optional set of partitions to match data files
+        parent_snapshot: Parent snapshot to get the history from
+
+    Returns:
+        DeleteFileIndex
+    """
+    if parent_snapshot is None or table.format_version < 2:
+        return DeleteFileIndex()
+
+    manifests, snapshot_ids = _validation_history(
+        table, parent_snapshot, starting_snapshot, 
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES
+    )
+
+    dfi = DeleteFileIndex()
+
+    for manifest in manifests:
+        for entry in manifest.fetch_manifest_entry(table.io, 
discard_deleted=True):
+            if _filter_manifest_entries(
+                entry, snapshot_ids, data_filter, partition_set, 
ManifestEntryStatus.ADDED, table.schema()
+            ):
+                dfi.add_delete_file(entry, entry.data_file.partition)
+
+    return dfi
+
+
+def _starting_sequence_number(table: Table, starting_snapshot: Snapshot) -> 
int:
+    """Find the starting sequence number from a snapshot.
+
+    Args:
+        table: Table to find snapshot from
+        starting_snapshot: Snapshot from where to start looking
+
+    Returns
+        Sequence number as int
+    """
+    if starting_snapshot is not None:
+        if seq := starting_snapshot.sequence_number:
+            return seq
+    return INITIAL_SEQUENCE_NUMBER
+
+
 def _validate_added_data_files(
     table: Table,
     starting_snapshot: Snapshot,
@@ -235,3 +298,60 @@ def _validate_added_data_files(
     if any(conflicting_entries):
         conflicting_snapshots = {entry.snapshot_id for entry in 
conflicting_entries if entry.snapshot_id is not None}
         raise ValidationException(f"Added data files were found matching the 
filter for snapshots {conflicting_snapshots}!")
+
+
+def _validate_no_new_delete_files(
+    table: Table,
+    starting_snapshot: Snapshot,
+    data_filter: BooleanExpression | None,
+    partition_set: dict[int, set[Record]] | None,
+    parent_snapshot: Snapshot | None,
+) -> None:
+    """Validate no new delete files matching a filter have been added to the 
table since starting a snapshot.
+
+    Args:
+        table: Table to validate
+        starting_snapshot: Snapshot current at the start of the operation
+        data_filter: Expression used to find added data files
+        partition_set: Dictionary of partition spec to set of partition records
+        parent_snapshot: Ending snapshot on the branch being validated
+    """
+    deletes = _added_delete_files(table, starting_snapshot, data_filter, 
partition_set, parent_snapshot)
+
+    if deletes.is_empty():
+        return
+
+    conflicting_delete_paths = [file.file_path for file in 
deletes.referenced_delete_files()]
+    raise ValidationException(
+        f"Found new conflicting delete files that can apply to records 
matching {data_filter}: {conflicting_delete_paths}"
+    )
+
+
+def _validate_no_new_deletes_for_data_files(
+    table: Table,
+    starting_snapshot: Snapshot,
+    data_filter: BooleanExpression | None,
+    data_files: set[DataFile],
+    parent_snapshot: Snapshot | None,
+) -> None:
+    """Validate no new delete files must be applied for data files that have 
been added to the table since a starting snapshot.
+
+    Args:
+        table: Table to validate
+        starting_snapshot: Snapshot current at the start of the operation
+        data_filter: Expression used to find added data files
+        data_files: data files to validate have no new deletes
+        parent_snapshot: Ending snapshot on the branch being validated
+    """
+    # If there is no current state, or no files has been added
+    if parent_snapshot is None or table.format_version < 2:
+        return
+
+    deletes = _added_delete_files(table, starting_snapshot, data_filter, None, 
parent_snapshot)
+    seq_num = _starting_sequence_number(table, starting_snapshot)
+
+    # Fail to any delete file found that applies to files written in or before 
the starting snapshot
+    for data_file in data_files:
+        delete_files = deletes.for_data_file(seq_num, data_file, 
data_file.partition)
+        if len(delete_files) > 0:
+            raise ValidationException(f"Cannot commit, found new delete for 
replace data file {data_file}")
diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py
index 570f6808..cb9d80e1 100644
--- a/tests/table/test_validate.py
+++ b/tests/table/test_validate.py
@@ -22,14 +22,17 @@ import pytest
 
 from pyiceberg.exceptions import ValidationException
 from pyiceberg.io import FileIO
-from pyiceberg.manifest import ManifestContent, ManifestEntry, 
ManifestEntryStatus, ManifestFile
+from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, 
ManifestEntry, ManifestEntryStatus, ManifestFile
 from pyiceberg.table import Table
 from pyiceberg.table.snapshots import Operation, Snapshot, Summary
 from pyiceberg.table.update.validate import (
     _added_data_files,
+    _added_delete_files,
     _deleted_data_files,
     _validate_added_data_files,
     _validate_deleted_data_files,
+    _validate_no_new_delete_files,
+    _validate_no_new_deletes_for_data_files,
     _validation_history,
 )
 
@@ -350,3 +353,153 @@ def test_validate_added_data_files_raises_on_conflict(
                 data_filter=None,
                 parent_snapshot=oldest_snapshot,
             )
+
+
[email protected]("operation", [Operation.APPEND, Operation.REPLACE])
+def test_added_delete_files_non_conflicting_count(
+    table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, 
list[ManifestFile]]],
+    operation: Operation,
+) -> None:
+    table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
+
+    snapshot_history = 100
+    snapshots = table.snapshots()
+    for i in range(1, snapshot_history + 1):
+        altered_snapshot = snapshots[-i]
+        altered_snapshot = altered_snapshot.model_copy(update={"summary": 
Summary(operation=operation)})
+        snapshots[-i] = altered_snapshot
+
+    table.metadata = table.metadata.model_copy(
+        update={"snapshots": snapshots},
+    )
+
+    oldest_snapshot = table.snapshots()[-snapshot_history]
+    newest_snapshot = cast(Snapshot, table.current_snapshot())
+
+    def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> 
list[ManifestFile]:
+        """Mock the manifests method to use the snapshot_id for lookup."""
+        snapshot_id = self.snapshot_id
+        if snapshot_id in mock_manifests:
+            return mock_manifests[snapshot_id]
+        return []
+
+    def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, 
discard_deleted: bool = True) -> list[ManifestEntry]:
+        return [
+            ManifestEntry.from_args(
+                status=ManifestEntryStatus.ADDED, 
snapshot_id=self.added_snapshot_id, sequence_number=self.sequence_number
+            )
+        ]
+
+    with (
+        patch("pyiceberg.table.snapshots.Snapshot.manifests", 
new=mock_read_manifest_side_effect),
+        patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", 
new=mock_fetch_manifest_entry),
+    ):
+        dfi = _added_delete_files(
+            table=table,
+            starting_snapshot=newest_snapshot,
+            data_filter=None,
+            parent_snapshot=oldest_snapshot,
+            partition_set=None,
+        )
+
+        assert dfi.is_empty()
+        assert len(dfi.referenced_delete_files()) == 0
+
+
[email protected]("operation", [Operation.DELETE, Operation.OVERWRITE])
+def test_added_delete_files_conflicting_count(
+    table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, 
list[ManifestFile]]],
+    operation: Operation,
+) -> None:
+    table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
+
+    snapshot_history = 100
+    snapshots = table.snapshots()
+    for i in range(1, snapshot_history + 1):
+        altered_snapshot = snapshots[-i]
+        altered_snapshot = altered_snapshot.model_copy(update={"summary": 
Summary(operation=operation)})
+        snapshots[-i] = altered_snapshot
+
+    table.metadata = table.metadata.model_copy(
+        update={"snapshots": snapshots},
+    )
+
+    oldest_snapshot = table.snapshots()[-snapshot_history]
+    newest_snapshot = cast(Snapshot, table.current_snapshot())
+
+    mock_delete_file = DataFile.from_args(
+        content=DataFileContent.POSITION_DELETES,
+        file_path="s3://dummy/path",
+    )
+
+    mock_delete_file.spec_id = 0
+
+    def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> 
list[ManifestFile]:
+        """Mock the manifests method to use the snapshot_id for lookup."""
+        snapshot_id = self.snapshot_id
+        if snapshot_id in mock_manifests:
+            return mock_manifests[snapshot_id]
+        return []
+
+    def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, 
discard_deleted: bool = True) -> list[ManifestEntry]:
+        return [
+            ManifestEntry.from_args(
+                status=ManifestEntryStatus.ADDED,
+                snapshot_id=self.added_snapshot_id,
+                sequence_number=self.min_sequence_number,
+                data_file=mock_delete_file,
+            )
+        ]
+
+    with (
+        patch("pyiceberg.table.snapshots.Snapshot.manifests", 
new=mock_read_manifest_side_effect),
+        patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", 
new=mock_fetch_manifest_entry),
+    ):
+        dfi = _added_delete_files(
+            table=table,
+            starting_snapshot=newest_snapshot,
+            data_filter=None,
+            parent_snapshot=oldest_snapshot,
+            partition_set=None,
+        )
+
+        assert not dfi.is_empty()
+        assert dfi.referenced_delete_files()[0] == mock_delete_file
+
+
+def test_validate_no_new_delete_files_raises_on_conflict(
+    table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, 
list[ManifestFile]]],
+) -> None:
+    table, _ = table_v2_with_extensive_snapshots_and_manifests
+    oldest_snapshot = table.snapshots()[0]
+    newest_snapshot = cast(Snapshot, table.current_snapshot())
+
+    with patch("pyiceberg.table.update.validate.DeleteFileIndex.is_empty", 
return_value=False):
+        with pytest.raises(ValidationException):
+            _validate_no_new_delete_files(
+                table=table,
+                starting_snapshot=newest_snapshot,
+                data_filter=None,
+                partition_set=None,
+                parent_snapshot=oldest_snapshot,
+            )
+
+
+def test_validate_no_new_delete_files_for_data_files_raises_on_conflict(
+    table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, 
list[ManifestFile]]],
+) -> None:
+    table, _ = table_v2_with_extensive_snapshots_and_manifests
+    oldest_snapshot = table.snapshots()[0]
+    newest_snapshot = cast(Snapshot, table.current_snapshot())
+
+    mocked_data_file = DataFile.from_args()
+
+    with 
patch("pyiceberg.table.update.validate.DeleteFileIndex.for_data_file", 
return_value=[mocked_data_file]):
+        with pytest.raises(ValidationException):
+            _validate_no_new_deletes_for_data_files(
+                table=table,
+                starting_snapshot=newest_snapshot,
+                data_filter=None,
+                data_files={mocked_data_file},
+                parent_snapshot=oldest_snapshot,
+            )

Reply via email to