geruh commented on code in PR #3049:
URL: https://github.com/apache/iceberg-python/pull/3049#discussion_r2814553625
##########
tests/table/test_validate.py:
##########
@@ -350,3 +353,159 @@ class DummyEntry:
data_filter=None,
parent_snapshot=oldest_snapshot,
)
+
+
[email protected]("operation", [Operation.APPEND, Operation.REPLACE])
+def test_validate_added_delete_files_non_conflicting_count(
+ table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int,
list[ManifestFile]]],
+ operation: Operation,
+) -> None:
+ table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
+
+ snapshot_history = 100
+ snapshots = table.snapshots()
+ for i in range(1, snapshot_history + 1):
+ altered_snapshot = snapshots[-i]
+ altered_snapshot = altered_snapshot.model_copy(update={"summary":
Summary(operation=operation)})
+ snapshots[-i] = altered_snapshot
+
+ table.metadata = table.metadata.model_copy(
+ update={"snapshots": snapshots},
+ )
+
+ oldest_snapshot = table.snapshots()[-snapshot_history]
+ newest_snapshot = cast(Snapshot, table.current_snapshot())
+
+ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) ->
list[ManifestFile]:
+ """Mock the manifests method to use the snapshot_id for lookup."""
+ snapshot_id = self.snapshot_id
+ if snapshot_id in mock_manifests:
+ return mock_manifests[snapshot_id]
+ return []
+
+ def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO,
discard_deleted: bool = True) -> list[ManifestEntry]:
+ return [
+ ManifestEntry.from_args(
+ status=ManifestEntryStatus.ADDED,
snapshot_id=self.added_snapshot_id, sequence_number=self.sequence_number
+ )
+ ]
+
+ with (
+ patch("pyiceberg.table.snapshots.Snapshot.manifests",
new=mock_read_manifest_side_effect),
+ patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry",
new=mock_fetch_manifest_entry),
+ ):
+ dfi = _added_delete_files(
+ table=table,
+ starting_snapshot=newest_snapshot,
+ data_filter=None,
+ parent_snapshot=oldest_snapshot,
+ partition_set=None,
+ )
+
+ assert dfi.is_empty()
+ assert len(dfi.referenced_data_files()) == 0
+
+
[email protected]("operation", [Operation.DELETE, Operation.OVERWRITE])
+def test_validate_added_delete_files_conflicting_count(
+ table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int,
list[ManifestFile]]],
+ operation: Operation,
+) -> None:
+ table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
+
+ snapshot_history = 100
+ snapshots = table.snapshots()
+ for i in range(1, snapshot_history + 1):
+ altered_snapshot = snapshots[-i]
+ altered_snapshot = altered_snapshot.model_copy(update={"summary":
Summary(operation=operation)})
+ snapshots[-i] = altered_snapshot
+
+ table.metadata = table.metadata.model_copy(
+ update={"snapshots": snapshots},
+ )
+
+ oldest_snapshot = table.snapshots()[-snapshot_history]
+ newest_snapshot = cast(Snapshot, table.current_snapshot())
+
+ mock_delete_file = DataFile.from_args(
+ content=DataFileContent.POSITION_DELETES,
+ file_path="s3://dummy/path",
+ )
+
+ mock_delete_file.spec_id = 0
+
+ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) ->
list[ManifestFile]:
+ """Mock the manifests method to use the snapshot_id for lookup."""
+ snapshot_id = self.snapshot_id
+ if snapshot_id in mock_manifests:
+ return mock_manifests[snapshot_id]
+ return []
+
+ def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO,
discard_deleted: bool = True) -> list[ManifestEntry]:
+ result = [
+ ManifestEntry.from_args(
Review Comment:
I think this just creates and replaces the entry
##########
pyiceberg/table/update/validate.py:
##########
@@ -216,6 +225,61 @@ def _added_data_files(
yield entry
+def _added_delete_files(
+ table: Table,
+ starting_snapshot: Snapshot,
+ data_filter: BooleanExpression | None,
+ partition_set: dict[int, set[Record]] | None,
+ parent_snapshot: Snapshot | None,
+) -> DeleteFileIndex:
+ """Return matching delete files that have been added to the table since a
starting snapshot.
+
+ Args:
+ table: Table to get the history from
+ starting_snapshot: Starting snapshot to get the history from
+ data_filter: Optional filter to match data files
+ partition_set: Optional set of partitions to match data files
+ parent_snapshot: Parent snapshot to get the history from
+
+ Returns:
+ DeleteFileIndex
+ """
+ if parent_snapshot is None or table.format_version < 2:
+ return DeleteFileIndex()
+
+ manifests, snapshot_ids = _validation_history(
+ table, parent_snapshot, starting_snapshot,
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES
+ )
+
+ dfi = DeleteFileIndex()
+
+ for manifest in manifests:
+ for entry in manifest.fetch_manifest_entry(table.io,
discard_deleted=False):
Review Comment:
we can discard the deleted entries here
##########
pyiceberg/table/update/validate.py:
##########
@@ -216,6 +225,61 @@ def _added_data_files(
yield entry
+def _added_delete_files(
+ table: Table,
+ starting_snapshot: Snapshot,
+ data_filter: BooleanExpression | None,
+ partition_set: dict[int, set[Record]] | None,
+ parent_snapshot: Snapshot | None,
+) -> DeleteFileIndex:
+ """Return matching delete files that have been added to the table since a
starting snapshot.
+
+ Args:
+ table: Table to get the history from
+ starting_snapshot: Starting snapshot to get the history from
+ data_filter: Optional filter to match data files
+ partition_set: Optional set of partitions to match data files
+ parent_snapshot: Parent snapshot to get the history from
+
+ Returns:
+ DeleteFileIndex
+ """
+ if parent_snapshot is None or table.format_version < 2:
+ return DeleteFileIndex()
+
+ manifests, snapshot_ids = _validation_history(
+ table, parent_snapshot, starting_snapshot,
VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES
+ )
+
+ dfi = DeleteFileIndex()
+
+ for manifest in manifests:
+ for entry in manifest.fetch_manifest_entry(table.io,
discard_deleted=False):
+ if _filter_manifest_entries(
+ entry, snapshot_ids, data_filter, partition_set,
ManifestEntryStatus.ADDED, table.schema()
+ ):
+ dfi.add_delete_file(entry, entry.data_file.partition)
+
+ return dfi
+
+
+def _starting_sequence_number(table: Table, starting_snapshot: Snapshot |
None) -> int:
+ """Find the starting sequence number from a snapshot.
+
+ Args:
+ table: Table to find snapshot from
+ starting_snapshot: Snapshot from where to start looking
+
+ Returns
+ Sequence number as int
+ """
+ if starting_snapshot is not None:
+ if snapshot := table.snapshot_by_id(starting_snapshot.snapshot_id):
Review Comment:
do we need to look this up if we pass in the starting_snapshot already?
##########
pyiceberg/table/update/validate.py:
##########
@@ -235,3 +299,60 @@ def _validate_added_data_files(
if any(conflicting_entries):
conflicting_snapshots = {entry.snapshot_id for entry in
conflicting_entries if entry.snapshot_id is not None}
raise ValidationException(f"Added data files were found matching the
filter for snapshots {conflicting_snapshots}!")
+
+
+def _validate_no_new_delete_files(
+ table: Table,
+ starting_snapshot: Snapshot,
+ data_filter: BooleanExpression | None,
+ partition_set: dict[int, set[Record]] | None,
+ parent_snapshot: Snapshot | None,
+) -> None:
+ """Validate no new delete files matching a filter have been added to the
table since starting a snapshot.
+
+ Args:
+ table: Table to validate
+ starting_snapshot: Snapshot current at the start of the operation
+ data_filter: Expression used to find added data files
+ partition_set: Dictionary of partition spec to set of partition records
+ parent_snapshot: Ending snapshot on the branch being validated
+ """
+ deletes = _added_delete_files(table, starting_snapshot, data_filter,
partition_set, parent_snapshot)
+
+ if deletes.is_empty():
+ return
+
+ conflicting_delete_files = deletes.referenced_data_files()
+ raise ValidationException(
+ f"Found new conflicting delete files that can apply to records
matching {data_filter}: {conflicting_delete_files}"
+ )
+
+
+def _validate_no_new_delete_files_for_data_files(
+ table: Table,
+ starting_snapshot: Snapshot,
+ data_filter: BooleanExpression | None,
+ data_files: set[DataFile],
+ parent_snapshot: Snapshot | None,
+) -> None:
+ """Validate no new delete files must be applied for data files that have
been added to the table since a starting snapshot.
+
+ Args:
+ table: Table to validate
+ starting_snapshot: Snapshot current at the start of the operation
+ data_filter: Expression used to find added data files
+ data_files: data files to validate have no new deletes
+ parent_snapshot: Ending snapshot on the branch being validated
+ """
+ # If there is no current state, or no files has been added
+ if parent_snapshot is None or table.format_version < 2:
+ return
+
+ deletes = _added_delete_files(table, starting_snapshot, data_filter, None,
parent_snapshot)
+ seq_num = _starting_sequence_number(table, starting_snapshot)
+
+ # Fail to any delete file found that applies to files written in or before
the starting snapshot
+ for data_file in data_files:
+ delete_files = deletes.for_data_file(seq_num, data_file)
Review Comment:
We will need to pass in the partition key here for matching
`deletes.for_data_file(seq_num, data_file, data_file.partition)`
##########
pyiceberg/table/update/validate.py:
##########
@@ -235,3 +299,60 @@ def _validate_added_data_files(
if any(conflicting_entries):
conflicting_snapshots = {entry.snapshot_id for entry in
conflicting_entries if entry.snapshot_id is not None}
raise ValidationException(f"Added data files were found matching the
filter for snapshots {conflicting_snapshots}!")
+
+
+def _validate_no_new_delete_files(
+ table: Table,
+ starting_snapshot: Snapshot,
+ data_filter: BooleanExpression | None,
+ partition_set: dict[int, set[Record]] | None,
+ parent_snapshot: Snapshot | None,
+) -> None:
+ """Validate no new delete files matching a filter have been added to the
table since starting a snapshot.
+
+ Args:
+ table: Table to validate
+ starting_snapshot: Snapshot current at the start of the operation
+ data_filter: Expression used to find added data files
+ partition_set: Dictionary of partition spec to set of partition records
+ parent_snapshot: Ending snapshot on the branch being validated
+ """
+ deletes = _added_delete_files(table, starting_snapshot, data_filter,
partition_set, parent_snapshot)
+
+ if deletes.is_empty():
+ return
+
+ conflicting_delete_files = deletes.referenced_data_files()
+ raise ValidationException(
+ f"Found new conflicting delete files that can apply to records
matching {data_filter}: {conflicting_delete_files}"
Review Comment:
nit: this will dump the entire delete file list we can just dump the
locations here to avoid the noise
##########
pyiceberg/table/delete_file_index.py:
##########
@@ -140,3 +144,14 @@ def for_data_file(self, seq_num: int, data_file: DataFile,
partition_key: Record
deletes.update(path_deletes.filter_by_seq(seq_num))
return deletes
+
+ def referenced_data_files(self) -> list[DataFile]:
Review Comment:
nit: this is a bit confusing as it's technically the referenced delete file
list.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]