This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new d587aab1 Add concurrency safety validation checks (#3049)
d587aab1 is described below
commit d587aab12bcd08c9f11708b7187f2d151c81ac94
Author: Gabriel Igliozzi <[email protected]>
AuthorDate: Fri Feb 20 20:46:50 2026 +0100
Add concurrency safety validation checks (#3049)
<!--
Thanks for opening a pull request!
-->
<!-- In the case this PR will resolve an issue, please replace
${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
Closes #1930
Closes #1931
# Rationale for this change
Now that we have an implementation of DeleteFileIndex we can move ahead
and implement the last two safety concurrency validations mentioned in
this #819
## Are these changes tested?
Yes
## Are there any user-facing changes?
No
<!-- In the case of user-facing changes, please add the changelog label.
-->
---
pyiceberg/table/delete_file_index.py | 15 ++++
pyiceberg/table/update/validate.py | 122 ++++++++++++++++++++++++++-
tests/table/test_validate.py | 155 ++++++++++++++++++++++++++++++++++-
3 files changed, 290 insertions(+), 2 deletions(-)
diff --git a/pyiceberg/table/delete_file_index.py
b/pyiceberg/table/delete_file_index.py
index 5c4323ea..3f513aab 100644
--- a/pyiceberg/table/delete_file_index.py
+++ b/pyiceberg/table/delete_file_index.py
@@ -54,6 +54,10 @@ class PositionDeletes:
start_idx = bisect_left(self._seqs, seq)
return [delete_file for delete_file, _ in self._files[start_idx:]]
+ def referenced_delete_files(self) -> list[DataFile]:
+ self._ensure_indexed()
+ return [data_file for data_file, _ in self._files]
+
def _has_path_bounds(delete_file: DataFile) -> bool:
lower = delete_file.lower_bounds
@@ -140,3 +144,14 @@ class DeleteFileIndex:
deletes.update(path_deletes.filter_by_seq(seq_num))
return deletes
+
+ def referenced_delete_files(self) -> list[DataFile]:
+ data_files: list[DataFile] = []
+
+ for deletes in self._by_partition.values():
+ data_files.extend(deletes.referenced_delete_files())
+
+ for deletes in self._by_path.values():
+ data_files.extend(deletes.referenced_delete_files())
+
+ return data_files
diff --git a/pyiceberg/table/update/validate.py
b/pyiceberg/table/update/validate.py
index 0cda688f..8178ed6e 100644
--- a/pyiceberg/table/update/validate.py
+++ b/pyiceberg/table/update/validate.py
@@ -19,14 +19,23 @@ from collections.abc import Iterator
from pyiceberg.exceptions import ValidationException
from pyiceberg.expressions import BooleanExpression
from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH,
_InclusiveMetricsEvaluator
-from pyiceberg.manifest import ManifestContent, ManifestEntry,
ManifestEntryStatus, ManifestFile
+from pyiceberg.manifest import (
+ INITIAL_SEQUENCE_NUMBER,
+ DataFile,
+ ManifestContent,
+ ManifestEntry,
+ ManifestEntryStatus,
+ ManifestFile,
+)
from pyiceberg.schema import Schema
from pyiceberg.table import Table
+from pyiceberg.table.delete_file_index import DeleteFileIndex
from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
from pyiceberg.typedef import Record
VALIDATE_DATA_FILES_EXIST_OPERATIONS: set[Operation] = {Operation.OVERWRITE,
Operation.REPLACE, Operation.DELETE}
VALIDATE_ADDED_DATA_FILES_OPERATIONS: set[Operation] = {Operation.APPEND,
Operation.OVERWRITE}
+VALIDATE_ADDED_DELETE_FILES_OPERATIONS: set[Operation] = {Operation.DELETE,
Operation.OVERWRITE}
def _validation_history(
@@ -216,6 +225,60 @@ def _added_data_files(
yield entry
+def _added_delete_files(
+ table: Table,
+ starting_snapshot: Snapshot,
+ data_filter: BooleanExpression | None,
+ partition_set: dict[int, set[Record]] | None,
+ parent_snapshot: Snapshot | None,
+) -> DeleteFileIndex:
+ """Return matching delete files that have been added to the table since a
starting snapshot.
+
+ Args:
+ table: Table to get the history from
+ starting_snapshot: Starting snapshot to get the history from
+ data_filter: Optional filter to match data files
+ partition_set: Optional set of partitions to match data files
+ parent_snapshot: Parent snapshot to get the history from
+
+ Returns:
+ DeleteFileIndex
+ """
+ if parent_snapshot is None or table.format_version < 2:
+ return DeleteFileIndex()
+
+ manifests, snapshot_ids = _validation_history(
+ table, parent_snapshot, starting_snapshot,
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES
+ )
+
+ dfi = DeleteFileIndex()
+
+ for manifest in manifests:
+ for entry in manifest.fetch_manifest_entry(table.io,
discard_deleted=True):
+ if _filter_manifest_entries(
+ entry, snapshot_ids, data_filter, partition_set,
ManifestEntryStatus.ADDED, table.schema()
+ ):
+ dfi.add_delete_file(entry, entry.data_file.partition)
+
+ return dfi
+
+
+def _starting_sequence_number(table: Table, starting_snapshot: Snapshot) ->
int:
+ """Find the starting sequence number from a snapshot.
+
+ Args:
+ table: Table to find snapshot from
+ starting_snapshot: Snapshot from where to start looking
+
+ Returns
+ Sequence number as int
+ """
+ if starting_snapshot is not None:
+ if seq := starting_snapshot.sequence_number:
+ return seq
+ return INITIAL_SEQUENCE_NUMBER
+
+
def _validate_added_data_files(
table: Table,
starting_snapshot: Snapshot,
@@ -235,3 +298,60 @@ def _validate_added_data_files(
if any(conflicting_entries):
conflicting_snapshots = {entry.snapshot_id for entry in
conflicting_entries if entry.snapshot_id is not None}
raise ValidationException(f"Added data files were found matching the
filter for snapshots {conflicting_snapshots}!")
+
+
+def _validate_no_new_delete_files(
+ table: Table,
+ starting_snapshot: Snapshot,
+ data_filter: BooleanExpression | None,
+ partition_set: dict[int, set[Record]] | None,
+ parent_snapshot: Snapshot | None,
+) -> None:
+ """Validate no new delete files matching a filter have been added to the
table since starting a snapshot.
+
+ Args:
+ table: Table to validate
+ starting_snapshot: Snapshot current at the start of the operation
+ data_filter: Expression used to find added data files
+ partition_set: Dictionary of partition spec to set of partition records
+ parent_snapshot: Ending snapshot on the branch being validated
+ """
+ deletes = _added_delete_files(table, starting_snapshot, data_filter,
partition_set, parent_snapshot)
+
+ if deletes.is_empty():
+ return
+
+ conflicting_delete_paths = [file.file_path for file in
deletes.referenced_delete_files()]
+ raise ValidationException(
+ f"Found new conflicting delete files that can apply to records
matching {data_filter}: {conflicting_delete_paths}"
+ )
+
+
+def _validate_no_new_deletes_for_data_files(
+ table: Table,
+ starting_snapshot: Snapshot,
+ data_filter: BooleanExpression | None,
+ data_files: set[DataFile],
+ parent_snapshot: Snapshot | None,
+) -> None:
+ """Validate no new delete files must be applied for data files that have
been added to the table since a starting snapshot.
+
+ Args:
+ table: Table to validate
+ starting_snapshot: Snapshot current at the start of the operation
+ data_filter: Expression used to find added data files
+ data_files: data files to validate have no new deletes
+ parent_snapshot: Ending snapshot on the branch being validated
+ """
+ # If there is no current state, or no files has been added
+ if parent_snapshot is None or table.format_version < 2:
+ return
+
+ deletes = _added_delete_files(table, starting_snapshot, data_filter, None,
parent_snapshot)
+ seq_num = _starting_sequence_number(table, starting_snapshot)
+
+ # Fail to any delete file found that applies to files written in or before
the starting snapshot
+ for data_file in data_files:
+ delete_files = deletes.for_data_file(seq_num, data_file,
data_file.partition)
+ if len(delete_files) > 0:
+ raise ValidationException(f"Cannot commit, found new delete for
replace data file {data_file}")
diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py
index 570f6808..cb9d80e1 100644
--- a/tests/table/test_validate.py
+++ b/tests/table/test_validate.py
@@ -22,14 +22,17 @@ import pytest
from pyiceberg.exceptions import ValidationException
from pyiceberg.io import FileIO
-from pyiceberg.manifest import ManifestContent, ManifestEntry,
ManifestEntryStatus, ManifestFile
+from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent,
ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.table import Table
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
from pyiceberg.table.update.validate import (
_added_data_files,
+ _added_delete_files,
_deleted_data_files,
_validate_added_data_files,
_validate_deleted_data_files,
+ _validate_no_new_delete_files,
+ _validate_no_new_deletes_for_data_files,
_validation_history,
)
@@ -350,3 +353,153 @@ def test_validate_added_data_files_raises_on_conflict(
data_filter=None,
parent_snapshot=oldest_snapshot,
)
+
+
[email protected]("operation", [Operation.APPEND, Operation.REPLACE])
+def test_added_delete_files_non_conflicting_count(
+ table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int,
list[ManifestFile]]],
+ operation: Operation,
+) -> None:
+ table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
+
+ snapshot_history = 100
+ snapshots = table.snapshots()
+ for i in range(1, snapshot_history + 1):
+ altered_snapshot = snapshots[-i]
+ altered_snapshot = altered_snapshot.model_copy(update={"summary":
Summary(operation=operation)})
+ snapshots[-i] = altered_snapshot
+
+ table.metadata = table.metadata.model_copy(
+ update={"snapshots": snapshots},
+ )
+
+ oldest_snapshot = table.snapshots()[-snapshot_history]
+ newest_snapshot = cast(Snapshot, table.current_snapshot())
+
+ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) ->
list[ManifestFile]:
+ """Mock the manifests method to use the snapshot_id for lookup."""
+ snapshot_id = self.snapshot_id
+ if snapshot_id in mock_manifests:
+ return mock_manifests[snapshot_id]
+ return []
+
+ def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO,
discard_deleted: bool = True) -> list[ManifestEntry]:
+ return [
+ ManifestEntry.from_args(
+ status=ManifestEntryStatus.ADDED,
snapshot_id=self.added_snapshot_id, sequence_number=self.sequence_number
+ )
+ ]
+
+ with (
+ patch("pyiceberg.table.snapshots.Snapshot.manifests",
new=mock_read_manifest_side_effect),
+ patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry",
new=mock_fetch_manifest_entry),
+ ):
+ dfi = _added_delete_files(
+ table=table,
+ starting_snapshot=newest_snapshot,
+ data_filter=None,
+ parent_snapshot=oldest_snapshot,
+ partition_set=None,
+ )
+
+ assert dfi.is_empty()
+ assert len(dfi.referenced_delete_files()) == 0
+
+
[email protected]("operation", [Operation.DELETE, Operation.OVERWRITE])
+def test_added_delete_files_conflicting_count(
+ table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int,
list[ManifestFile]]],
+ operation: Operation,
+) -> None:
+ table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
+
+ snapshot_history = 100
+ snapshots = table.snapshots()
+ for i in range(1, snapshot_history + 1):
+ altered_snapshot = snapshots[-i]
+ altered_snapshot = altered_snapshot.model_copy(update={"summary":
Summary(operation=operation)})
+ snapshots[-i] = altered_snapshot
+
+ table.metadata = table.metadata.model_copy(
+ update={"snapshots": snapshots},
+ )
+
+ oldest_snapshot = table.snapshots()[-snapshot_history]
+ newest_snapshot = cast(Snapshot, table.current_snapshot())
+
+ mock_delete_file = DataFile.from_args(
+ content=DataFileContent.POSITION_DELETES,
+ file_path="s3://dummy/path",
+ )
+
+ mock_delete_file.spec_id = 0
+
+ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) ->
list[ManifestFile]:
+ """Mock the manifests method to use the snapshot_id for lookup."""
+ snapshot_id = self.snapshot_id
+ if snapshot_id in mock_manifests:
+ return mock_manifests[snapshot_id]
+ return []
+
+ def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO,
discard_deleted: bool = True) -> list[ManifestEntry]:
+ return [
+ ManifestEntry.from_args(
+ status=ManifestEntryStatus.ADDED,
+ snapshot_id=self.added_snapshot_id,
+ sequence_number=self.min_sequence_number,
+ data_file=mock_delete_file,
+ )
+ ]
+
+ with (
+ patch("pyiceberg.table.snapshots.Snapshot.manifests",
new=mock_read_manifest_side_effect),
+ patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry",
new=mock_fetch_manifest_entry),
+ ):
+ dfi = _added_delete_files(
+ table=table,
+ starting_snapshot=newest_snapshot,
+ data_filter=None,
+ parent_snapshot=oldest_snapshot,
+ partition_set=None,
+ )
+
+ assert not dfi.is_empty()
+ assert dfi.referenced_delete_files()[0] == mock_delete_file
+
+
+def test_validate_no_new_delete_files_raises_on_conflict(
+ table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int,
list[ManifestFile]]],
+) -> None:
+ table, _ = table_v2_with_extensive_snapshots_and_manifests
+ oldest_snapshot = table.snapshots()[0]
+ newest_snapshot = cast(Snapshot, table.current_snapshot())
+
+ with patch("pyiceberg.table.update.validate.DeleteFileIndex.is_empty",
return_value=False):
+ with pytest.raises(ValidationException):
+ _validate_no_new_delete_files(
+ table=table,
+ starting_snapshot=newest_snapshot,
+ data_filter=None,
+ partition_set=None,
+ parent_snapshot=oldest_snapshot,
+ )
+
+
+def test_validate_no_new_delete_files_for_data_files_raises_on_conflict(
+ table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int,
list[ManifestFile]]],
+) -> None:
+ table, _ = table_v2_with_extensive_snapshots_and_manifests
+ oldest_snapshot = table.snapshots()[0]
+ newest_snapshot = cast(Snapshot, table.current_snapshot())
+
+ mocked_data_file = DataFile.from_args()
+
+ with
patch("pyiceberg.table.update.validate.DeleteFileIndex.for_data_file",
return_value=[mocked_data_file]):
+ with pytest.raises(ValidationException):
+ _validate_no_new_deletes_for_data_files(
+ table=table,
+ starting_snapshot=newest_snapshot,
+ data_filter=None,
+ data_files={mocked_data_file},
+ parent_snapshot=oldest_snapshot,
+ )