This is an automated email from the ASF dual-hosted git repository.

sungwy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new e6f6018a feat: `validation_history` and `ancestors_between` (#1935)
e6f6018a is described below

commit e6f6018a3fcfbd95962cac03f952109e8300ae42
Author: Jayce Slesar <[email protected]>
AuthorDate: Thu May 1 08:18:34 2025 -0400

    feat: `validation_history` and `ancestors_between` (#1935)
---
 pyiceberg/exceptions.py            |   4 ++
 pyiceberg/table/snapshots.py       |  13 ++++
 pyiceberg/table/update/validate.py |  71 +++++++++++++++++++
 tests/table/test_init.py           |  39 -----------
 tests/table/test_snapshots.py      |  68 +++++++++++++++++-
 tests/table/test_validate.py       | 138 +++++++++++++++++++++++++++++++++++++
 6 files changed, 293 insertions(+), 40 deletions(-)

diff --git a/pyiceberg/exceptions.py b/pyiceberg/exceptions.py
index 56574ff4..c80f104e 100644
--- a/pyiceberg/exceptions.py
+++ b/pyiceberg/exceptions.py
@@ -122,3 +122,7 @@ class CommitStateUnknownException(RESTError):
 
 class WaitingForLockException(Exception):
     """Need to wait for a lock, try again."""
+
+
+class ValidationException(Exception):
+    """Raised when validation fails."""
diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py
index 1e48126c..bf18a2b1 100644
--- a/pyiceberg/table/snapshots.py
+++ b/pyiceberg/table/snapshots.py
@@ -435,3 +435,16 @@ def ancestors_of(current_snapshot: Optional[Snapshot], 
table_metadata: TableMeta
         if snapshot.parent_snapshot_id is None:
             break
         snapshot = table_metadata.snapshot_by_id(snapshot.parent_snapshot_id)
+
+
+def ancestors_between(
+    to_snapshot: Snapshot, from_snapshot: Optional[Snapshot], table_metadata: 
TableMetadata
+) -> Iterable[Snapshot]:
+    """Get the ancestors of and including the given snapshot between the to 
and from snapshots."""
+    if from_snapshot is not None:
+        for snapshot in ancestors_of(to_snapshot, table_metadata):
+            yield snapshot
+            if snapshot == from_snapshot:
+                break
+    else:
+        yield from ancestors_of(to_snapshot, table_metadata)
diff --git a/pyiceberg/table/update/validate.py 
b/pyiceberg/table/update/validate.py
new file mode 100644
index 00000000..7caaf1d5
--- /dev/null
+++ b/pyiceberg/table/update/validate.py
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyiceberg.exceptions import ValidationException
+from pyiceberg.manifest import ManifestContent, ManifestFile
+from pyiceberg.table import Table
+from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
+
+
+def validation_history(
+    table: Table,
+    to_snapshot: Snapshot,
+    from_snapshot: Snapshot,
+    matching_operations: set[Operation],
+    manifest_content_filter: ManifestContent,
+) -> tuple[list[ManifestFile], set[int]]:
+    """Return newly added manifests and snapshot IDs between the starting 
snapshot and parent snapshot.
+
+    Args:
+        table: Table to get the history from
+        to_snapshot: Starting snapshot
+        from_snapshot: Parent snapshot to get the history from
+        matching_operations: Operations to match on
+        manifest_content_filter: Manifest content type to filter
+
+    Raises:
+        ValidationException: If no matching snapshot is found or only one 
snapshot is found
+
+    Returns:
+        List of manifest files and set of snapshots ID's matching conditions
+    """
+    manifests_files: list[ManifestFile] = []
+    snapshots: set[int] = set()
+
+    last_snapshot = None
+    for snapshot in ancestors_between(to_snapshot, from_snapshot, 
table.metadata):
+        last_snapshot = snapshot
+        summary = snapshot.summary
+        if summary is None:
+            raise ValidationException(f"No summary found for snapshot 
{snapshot}!")
+        if summary.operation not in matching_operations:
+            continue
+
+        snapshots.add(snapshot.snapshot_id)
+        # TODO: Maybe do the IO in a separate thread at some point, and 
collect at the bottom (we can easily merge the sets
+        manifests_files.extend(
+            [
+                manifest
+                for manifest in snapshot.manifests(table.io)
+                if manifest.added_snapshot_id == snapshot.snapshot_id and 
manifest.content == manifest_content_filter
+            ]
+        )
+
+    if last_snapshot is not None and last_snapshot.snapshot_id != 
from_snapshot.snapshot_id:
+        raise ValidationException("No matching snapshot found.")
+
+    return manifests_files, snapshots
diff --git a/tests/table/test_init.py b/tests/table/test_init.py
index acc31d47..dbac84bd 100644
--- a/tests/table/test_init.py
+++ b/tests/table/test_init.py
@@ -57,7 +57,6 @@ from pyiceberg.table.snapshots import (
     Snapshot,
     SnapshotLogEntry,
     Summary,
-    ancestors_of,
 )
 from pyiceberg.table.sorting import (
     NullOrder,
@@ -225,44 +224,6 @@ def test_snapshot_by_timestamp(table_v2: Table) -> None:
     assert table_v2.snapshot_as_of_timestamp(1515100955770, inclusive=False) 
is None
 
 
-def test_ancestors_of(table_v2: Table) -> None:
-    assert list(ancestors_of(table_v2.current_snapshot(), table_v2.metadata)) 
== [
-        Snapshot(
-            snapshot_id=3055729675574597004,
-            parent_snapshot_id=3051729675574597004,
-            sequence_number=1,
-            timestamp_ms=1555100955770,
-            manifest_list="s3://a/b/2.avro",
-            summary=Summary(Operation.APPEND),
-            schema_id=1,
-        ),
-        Snapshot(
-            snapshot_id=3051729675574597004,
-            parent_snapshot_id=None,
-            sequence_number=0,
-            timestamp_ms=1515100955770,
-            manifest_list="s3://a/b/1.avro",
-            summary=Summary(Operation.APPEND),
-            schema_id=None,
-        ),
-    ]
-
-
-def test_ancestors_of_recursive_error(table_v2_with_extensive_snapshots: 
Table) -> None:
-    # Test RecursionError: maximum recursion depth exceeded
-    assert (
-        len(
-            list(
-                ancestors_of(
-                    table_v2_with_extensive_snapshots.current_snapshot(),
-                    table_v2_with_extensive_snapshots.metadata,
-                )
-            )
-        )
-        == 2000
-    )
-
-
 def test_snapshot_by_id_does_not_exist(table_v2: Table) -> None:
     assert table_v2.snapshot_by_id(-1) is None
 
diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py
index 3f0dae14..2395ea77 100644
--- a/tests/table/test_snapshots.py
+++ b/tests/table/test_snapshots.py
@@ -15,12 +15,23 @@
 # specific language governing permissions and limitations
 # under the License.
 # pylint:disable=redefined-outer-name,eval-used
+from typing import cast
+
 import pytest
 
 from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, 
ManifestFile
 from pyiceberg.partitioning import PartitionField, PartitionSpec
 from pyiceberg.schema import Schema
-from pyiceberg.table.snapshots import Operation, Snapshot, 
SnapshotSummaryCollector, Summary, update_snapshot_summaries
+from pyiceberg.table import Table
+from pyiceberg.table.snapshots import (
+    Operation,
+    Snapshot,
+    SnapshotSummaryCollector,
+    Summary,
+    ancestors_between,
+    ancestors_of,
+    update_snapshot_summaries,
+)
 from pyiceberg.transforms import IdentityTransform
 from pyiceberg.typedef import Record
 from pyiceberg.types import (
@@ -368,3 +379,58 @@ def test_invalid_type() -> None:
         )
 
     assert "Could not parse summary property total-data-files to an int: abc" 
in str(e.value)
+
+
+def test_ancestors_of(table_v2: Table) -> None:
+    assert list(ancestors_of(table_v2.current_snapshot(), table_v2.metadata)) 
== [
+        Snapshot(
+            snapshot_id=3055729675574597004,
+            parent_snapshot_id=3051729675574597004,
+            sequence_number=1,
+            timestamp_ms=1555100955770,
+            manifest_list="s3://a/b/2.avro",
+            summary=Summary(Operation.APPEND),
+            schema_id=1,
+        ),
+        Snapshot(
+            snapshot_id=3051729675574597004,
+            parent_snapshot_id=None,
+            sequence_number=0,
+            timestamp_ms=1515100955770,
+            manifest_list="s3://a/b/1.avro",
+            summary=Summary(Operation.APPEND),
+            schema_id=None,
+        ),
+    ]
+
+
+def test_ancestors_of_recursive_error(table_v2_with_extensive_snapshots: 
Table) -> None:
+    # Test RecursionError: maximum recursion depth exceeded
+    assert (
+        len(
+            list(
+                ancestors_of(
+                    table_v2_with_extensive_snapshots.current_snapshot(),
+                    table_v2_with_extensive_snapshots.metadata,
+                )
+            )
+        )
+        == 2000
+    )
+
+
+def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None:
+    oldest_snapshot = table_v2_with_extensive_snapshots.snapshots()[0]
+    current_snapshot = cast(Snapshot, 
table_v2_with_extensive_snapshots.current_snapshot())
+    assert (
+        len(
+            list(
+                ancestors_between(
+                    current_snapshot,
+                    oldest_snapshot,
+                    table_v2_with_extensive_snapshots.metadata,
+                )
+            )
+        )
+        == 2000
+    )
diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py
new file mode 100644
index 00000000..eac3733f
--- /dev/null
+++ b/tests/table/test_validate.py
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name,eval-used
+from typing import cast
+from unittest.mock import patch
+
+import pytest
+
+from pyiceberg.exceptions import ValidationException
+from pyiceberg.io import FileIO
+from pyiceberg.manifest import ManifestContent, ManifestFile
+from pyiceberg.table import Table
+from pyiceberg.table.snapshots import Operation, Snapshot
+from pyiceberg.table.update.validate import validation_history
+
+
[email protected]
+def table_v2_with_extensive_snapshots_and_manifests(
+    table_v2_with_extensive_snapshots: Table,
+) -> tuple[Table, dict[int, list[ManifestFile]]]:
+    """Fixture to create a table with extensive snapshots and manifests."""
+    mock_manifests = {}
+
+    for i, snapshot in 
enumerate(table_v2_with_extensive_snapshots.snapshots()):
+        mock_manifest = ManifestFile.from_args(
+            manifest_path=f"foo/bar/{i}",
+            manifest_length=1,
+            partition_spec_id=1,
+            content=ManifestContent.DATA if i % 2 == 0 else 
ManifestContent.DELETES,
+            sequence_number=1,
+            min_sequence_number=1,
+            added_snapshot_id=snapshot.snapshot_id,
+        )
+
+        # Store the manifest for this specific snapshot
+        mock_manifests[snapshot.snapshot_id] = [mock_manifest]
+
+    return table_v2_with_extensive_snapshots, mock_manifests
+
+
+def test_validation_history(table_v2_with_extensive_snapshots_and_manifests: 
tuple[Table, dict[int, list[ManifestFile]]]) -> None:
+    """Test the validation history function."""
+    table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
+
+    expected_manifest_data_counts = len([m for m in mock_manifests.values() if 
m[0].content == ManifestContent.DATA])
+
+    oldest_snapshot = table.snapshots()[0]
+    newest_snapshot = cast(Snapshot, table.current_snapshot())
+
+    def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> 
list[ManifestFile]:
+        """Mock the manifests method to use the snapshot_id for lookup."""
+        snapshot_id = self.snapshot_id
+        if snapshot_id in mock_manifests:
+            return mock_manifests[snapshot_id]
+        return []
+
+    with patch("pyiceberg.table.snapshots.Snapshot.manifests", 
new=mock_read_manifest_side_effect):
+        manifests, snapshots = validation_history(
+            table,
+            newest_snapshot,
+            oldest_snapshot,
+            {Operation.APPEND},
+            ManifestContent.DATA,
+        )
+
+        assert len(manifests) == expected_manifest_data_counts
+
+
+def test_validation_history_fails_on_snapshot_with_no_summary(
+    table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, 
list[ManifestFile]]],
+) -> None:
+    """Test the validation history function fails on snapshot with no 
summary."""
+    table, _ = table_v2_with_extensive_snapshots_and_manifests
+    oldest_snapshot = table.snapshots()[0]
+    newest_snapshot = cast(Snapshot, table.current_snapshot())
+
+    # Create a snapshot with no summary
+    snapshot_with_no_summary = Snapshot(
+        snapshot_id="1234",
+        parent_id="5678",
+        timestamp_ms=0,
+        operation=Operation.APPEND,
+        summary=None,
+        manifest_list="foo/bar",
+    )
+    with patch("pyiceberg.table.update.validate.ancestors_between", 
return_value=[snapshot_with_no_summary]):
+        with pytest.raises(ValidationException):
+            validation_history(
+                table,
+                newest_snapshot,
+                oldest_snapshot,
+                {Operation.APPEND},
+                ManifestContent.DATA,
+            )
+
+
+def test_validation_history_fails_on_from_snapshot_not_matching_last_snapshot(
+    table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, 
list[ManifestFile]]],
+) -> None:
+    """Test the validation history function fails when from_snapshot doesn't 
match last_snapshot."""
+    table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
+
+    oldest_snapshot = table.snapshots()[0]
+    newest_snapshot = cast(Snapshot, table.current_snapshot())
+
+    def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> 
list[ManifestFile]:
+        """Mock the manifests method to use the snapshot_id for lookup."""
+        snapshot_id = self.snapshot_id
+        if snapshot_id in mock_manifests:
+            return mock_manifests[snapshot_id]
+        return []
+
+    missing_oldest_snapshot = table.snapshots()[1:]
+
+    with patch("pyiceberg.table.snapshots.Snapshot.manifests", 
new=mock_read_manifest_side_effect):
+        with patch("pyiceberg.table.update.validate.ancestors_between", 
return_value=missing_oldest_snapshot):
+            with pytest.raises(ValidationException):
+                validation_history(
+                    table,
+                    newest_snapshot,
+                    oldest_snapshot,
+                    {Operation.APPEND},
+                    ManifestContent.DATA,
+                )

Reply via email to