This is an automated email from the ASF dual-hosted git repository.
sungwy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new e6f6018a feat: `validation_history` and `ancestors_between` (#1935)
e6f6018a is described below
commit e6f6018a3fcfbd95962cac03f952109e8300ae42
Author: Jayce Slesar <[email protected]>
AuthorDate: Thu May 1 08:18:34 2025 -0400
feat: `validation_history` and `ancestors_between` (#1935)
---
pyiceberg/exceptions.py | 4 ++
pyiceberg/table/snapshots.py | 13 ++++
pyiceberg/table/update/validate.py | 71 +++++++++++++++++++
tests/table/test_init.py | 39 -----------
tests/table/test_snapshots.py | 68 +++++++++++++++++-
tests/table/test_validate.py | 138 +++++++++++++++++++++++++++++++++++++
6 files changed, 293 insertions(+), 40 deletions(-)
diff --git a/pyiceberg/exceptions.py b/pyiceberg/exceptions.py
index 56574ff4..c80f104e 100644
--- a/pyiceberg/exceptions.py
+++ b/pyiceberg/exceptions.py
@@ -122,3 +122,7 @@ class CommitStateUnknownException(RESTError):
class WaitingForLockException(Exception):
"""Need to wait for a lock, try again."""
+
+
+class ValidationException(Exception):
+ """Raised when validation fails."""
diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py
index 1e48126c..bf18a2b1 100644
--- a/pyiceberg/table/snapshots.py
+++ b/pyiceberg/table/snapshots.py
@@ -435,3 +435,16 @@ def ancestors_of(current_snapshot: Optional[Snapshot],
table_metadata: TableMeta
if snapshot.parent_snapshot_id is None:
break
snapshot = table_metadata.snapshot_by_id(snapshot.parent_snapshot_id)
+
+
+def ancestors_between(
+ to_snapshot: Snapshot, from_snapshot: Optional[Snapshot], table_metadata:
TableMetadata
+) -> Iterable[Snapshot]:
+ """Get the ancestors of and including the given snapshot between the to
and from snapshots."""
+ if from_snapshot is not None:
+ for snapshot in ancestors_of(to_snapshot, table_metadata):
+ yield snapshot
+ if snapshot == from_snapshot:
+ break
+ else:
+ yield from ancestors_of(to_snapshot, table_metadata)
diff --git a/pyiceberg/table/update/validate.py
b/pyiceberg/table/update/validate.py
new file mode 100644
index 00000000..7caaf1d5
--- /dev/null
+++ b/pyiceberg/table/update/validate.py
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyiceberg.exceptions import ValidationException
+from pyiceberg.manifest import ManifestContent, ManifestFile
+from pyiceberg.table import Table
+from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
+
+
+def validation_history(
+ table: Table,
+ to_snapshot: Snapshot,
+ from_snapshot: Snapshot,
+ matching_operations: set[Operation],
+ manifest_content_filter: ManifestContent,
+) -> tuple[list[ManifestFile], set[int]]:
+ """Return newly added manifests and snapshot IDs between the starting
snapshot and parent snapshot.
+
+ Args:
+ table: Table to get the history from
+ to_snapshot: Starting snapshot
+ from_snapshot: Parent snapshot to get the history from
+ matching_operations: Operations to match on
+ manifest_content_filter: Manifest content type to filter
+
+ Raises:
+ ValidationException: If no matching snapshot is found or only one
snapshot is found
+
+ Returns:
+ List of manifest files and set of snapshots ID's matching conditions
+ """
+ manifests_files: list[ManifestFile] = []
+ snapshots: set[int] = set()
+
+ last_snapshot = None
+ for snapshot in ancestors_between(to_snapshot, from_snapshot,
table.metadata):
+ last_snapshot = snapshot
+ summary = snapshot.summary
+ if summary is None:
+ raise ValidationException(f"No summary found for snapshot
{snapshot}!")
+ if summary.operation not in matching_operations:
+ continue
+
+ snapshots.add(snapshot.snapshot_id)
+ # TODO: Maybe do the IO in a separate thread at some point, and
collect at the bottom (we can easily merge the sets
+ manifests_files.extend(
+ [
+ manifest
+ for manifest in snapshot.manifests(table.io)
+ if manifest.added_snapshot_id == snapshot.snapshot_id and
manifest.content == manifest_content_filter
+ ]
+ )
+
+ if last_snapshot is not None and last_snapshot.snapshot_id !=
from_snapshot.snapshot_id:
+ raise ValidationException("No matching snapshot found.")
+
+ return manifests_files, snapshots
diff --git a/tests/table/test_init.py b/tests/table/test_init.py
index acc31d47..dbac84bd 100644
--- a/tests/table/test_init.py
+++ b/tests/table/test_init.py
@@ -57,7 +57,6 @@ from pyiceberg.table.snapshots import (
Snapshot,
SnapshotLogEntry,
Summary,
- ancestors_of,
)
from pyiceberg.table.sorting import (
NullOrder,
@@ -225,44 +224,6 @@ def test_snapshot_by_timestamp(table_v2: Table) -> None:
assert table_v2.snapshot_as_of_timestamp(1515100955770, inclusive=False)
is None
-def test_ancestors_of(table_v2: Table) -> None:
- assert list(ancestors_of(table_v2.current_snapshot(), table_v2.metadata))
== [
- Snapshot(
- snapshot_id=3055729675574597004,
- parent_snapshot_id=3051729675574597004,
- sequence_number=1,
- timestamp_ms=1555100955770,
- manifest_list="s3://a/b/2.avro",
- summary=Summary(Operation.APPEND),
- schema_id=1,
- ),
- Snapshot(
- snapshot_id=3051729675574597004,
- parent_snapshot_id=None,
- sequence_number=0,
- timestamp_ms=1515100955770,
- manifest_list="s3://a/b/1.avro",
- summary=Summary(Operation.APPEND),
- schema_id=None,
- ),
- ]
-
-
-def test_ancestors_of_recursive_error(table_v2_with_extensive_snapshots:
Table) -> None:
- # Test RecursionError: maximum recursion depth exceeded
- assert (
- len(
- list(
- ancestors_of(
- table_v2_with_extensive_snapshots.current_snapshot(),
- table_v2_with_extensive_snapshots.metadata,
- )
- )
- )
- == 2000
- )
-
-
def test_snapshot_by_id_does_not_exist(table_v2: Table) -> None:
assert table_v2.snapshot_by_id(-1) is None
diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py
index 3f0dae14..2395ea77 100644
--- a/tests/table/test_snapshots.py
+++ b/tests/table/test_snapshots.py
@@ -15,12 +15,23 @@
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name,eval-used
+from typing import cast
+
import pytest
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent,
ManifestFile
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
-from pyiceberg.table.snapshots import Operation, Snapshot,
SnapshotSummaryCollector, Summary, update_snapshot_summaries
+from pyiceberg.table import Table
+from pyiceberg.table.snapshots import (
+ Operation,
+ Snapshot,
+ SnapshotSummaryCollector,
+ Summary,
+ ancestors_between,
+ ancestors_of,
+ update_snapshot_summaries,
+)
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import Record
from pyiceberg.types import (
@@ -368,3 +379,58 @@ def test_invalid_type() -> None:
)
assert "Could not parse summary property total-data-files to an int: abc"
in str(e.value)
+
+
+def test_ancestors_of(table_v2: Table) -> None:
+ assert list(ancestors_of(table_v2.current_snapshot(), table_v2.metadata))
== [
+ Snapshot(
+ snapshot_id=3055729675574597004,
+ parent_snapshot_id=3051729675574597004,
+ sequence_number=1,
+ timestamp_ms=1555100955770,
+ manifest_list="s3://a/b/2.avro",
+ summary=Summary(Operation.APPEND),
+ schema_id=1,
+ ),
+ Snapshot(
+ snapshot_id=3051729675574597004,
+ parent_snapshot_id=None,
+ sequence_number=0,
+ timestamp_ms=1515100955770,
+ manifest_list="s3://a/b/1.avro",
+ summary=Summary(Operation.APPEND),
+ schema_id=None,
+ ),
+ ]
+
+
+def test_ancestors_of_recursive_error(table_v2_with_extensive_snapshots:
Table) -> None:
+ # Test RecursionError: maximum recursion depth exceeded
+ assert (
+ len(
+ list(
+ ancestors_of(
+ table_v2_with_extensive_snapshots.current_snapshot(),
+ table_v2_with_extensive_snapshots.metadata,
+ )
+ )
+ )
+ == 2000
+ )
+
+
+def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None:
+ oldest_snapshot = table_v2_with_extensive_snapshots.snapshots()[0]
+ current_snapshot = cast(Snapshot,
table_v2_with_extensive_snapshots.current_snapshot())
+ assert (
+ len(
+ list(
+ ancestors_between(
+ current_snapshot,
+ oldest_snapshot,
+ table_v2_with_extensive_snapshots.metadata,
+ )
+ )
+ )
+ == 2000
+ )
diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py
new file mode 100644
index 00000000..eac3733f
--- /dev/null
+++ b/tests/table/test_validate.py
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name,eval-used
+from typing import cast
+from unittest.mock import patch
+
+import pytest
+
+from pyiceberg.exceptions import ValidationException
+from pyiceberg.io import FileIO
+from pyiceberg.manifest import ManifestContent, ManifestFile
+from pyiceberg.table import Table
+from pyiceberg.table.snapshots import Operation, Snapshot
+from pyiceberg.table.update.validate import validation_history
+
+
[email protected]
+def table_v2_with_extensive_snapshots_and_manifests(
+ table_v2_with_extensive_snapshots: Table,
+) -> tuple[Table, dict[int, list[ManifestFile]]]:
+ """Fixture to create a table with extensive snapshots and manifests."""
+ mock_manifests = {}
+
+ for i, snapshot in
enumerate(table_v2_with_extensive_snapshots.snapshots()):
+ mock_manifest = ManifestFile.from_args(
+ manifest_path=f"foo/bar/{i}",
+ manifest_length=1,
+ partition_spec_id=1,
+ content=ManifestContent.DATA if i % 2 == 0 else
ManifestContent.DELETES,
+ sequence_number=1,
+ min_sequence_number=1,
+ added_snapshot_id=snapshot.snapshot_id,
+ )
+
+ # Store the manifest for this specific snapshot
+ mock_manifests[snapshot.snapshot_id] = [mock_manifest]
+
+ return table_v2_with_extensive_snapshots, mock_manifests
+
+
+def test_validation_history(table_v2_with_extensive_snapshots_and_manifests:
tuple[Table, dict[int, list[ManifestFile]]]) -> None:
+ """Test the validation history function."""
+ table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
+
+ expected_manifest_data_counts = len([m for m in mock_manifests.values() if
m[0].content == ManifestContent.DATA])
+
+ oldest_snapshot = table.snapshots()[0]
+ newest_snapshot = cast(Snapshot, table.current_snapshot())
+
+ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) ->
list[ManifestFile]:
+ """Mock the manifests method to use the snapshot_id for lookup."""
+ snapshot_id = self.snapshot_id
+ if snapshot_id in mock_manifests:
+ return mock_manifests[snapshot_id]
+ return []
+
+ with patch("pyiceberg.table.snapshots.Snapshot.manifests",
new=mock_read_manifest_side_effect):
+ manifests, snapshots = validation_history(
+ table,
+ newest_snapshot,
+ oldest_snapshot,
+ {Operation.APPEND},
+ ManifestContent.DATA,
+ )
+
+ assert len(manifests) == expected_manifest_data_counts
+
+
+def test_validation_history_fails_on_snapshot_with_no_summary(
+ table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int,
list[ManifestFile]]],
+) -> None:
+ """Test the validation history function fails on snapshot with no
summary."""
+ table, _ = table_v2_with_extensive_snapshots_and_manifests
+ oldest_snapshot = table.snapshots()[0]
+ newest_snapshot = cast(Snapshot, table.current_snapshot())
+
+ # Create a snapshot with no summary
+ snapshot_with_no_summary = Snapshot(
+ snapshot_id="1234",
+ parent_id="5678",
+ timestamp_ms=0,
+ operation=Operation.APPEND,
+ summary=None,
+ manifest_list="foo/bar",
+ )
+ with patch("pyiceberg.table.update.validate.ancestors_between",
return_value=[snapshot_with_no_summary]):
+ with pytest.raises(ValidationException):
+ validation_history(
+ table,
+ newest_snapshot,
+ oldest_snapshot,
+ {Operation.APPEND},
+ ManifestContent.DATA,
+ )
+
+
+def test_validation_history_fails_on_from_snapshot_not_matching_last_snapshot(
+ table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int,
list[ManifestFile]]],
+) -> None:
+ """Test the validation history function fails when from_snapshot doesn't
match last_snapshot."""
+ table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
+
+ oldest_snapshot = table.snapshots()[0]
+ newest_snapshot = cast(Snapshot, table.current_snapshot())
+
+ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) ->
list[ManifestFile]:
+ """Mock the manifests method to use the snapshot_id for lookup."""
+ snapshot_id = self.snapshot_id
+ if snapshot_id in mock_manifests:
+ return mock_manifests[snapshot_id]
+ return []
+
+ missing_oldest_snapshot = table.snapshots()[1:]
+
+ with patch("pyiceberg.table.snapshots.Snapshot.manifests",
new=mock_read_manifest_side_effect):
+ with patch("pyiceberg.table.update.validate.ancestors_between",
return_value=missing_oldest_snapshot):
+ with pytest.raises(ValidationException):
+ validation_history(
+ table,
+ newest_snapshot,
+ oldest_snapshot,
+ {Operation.APPEND},
+ ManifestContent.DATA,
+ )