Fokko commented on code in PR #6933:
URL: https://github.com/apache/iceberg/pull/6933#discussion_r1117879968


##########
python/pyiceberg/table/__init__.py:
##########
@@ -145,12 +152,22 @@ def current_snapshot(self) -> Optional[Snapshot]:
             return self.snapshot_by_id(snapshot_id)
         return None
 
-    def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
-        """Get the snapshot of this table with the given id, or None if there 
is no matching snapshot."""
+    def snapshot_by_id(self, snapshot_id: int) -> Snapshot:
+        """Get the snapshot of this table with the given id.
+
+        Args:
+            snapshot_id: The id of the snapshot to lookup in the table
+
+        Returns:
+            The snapshot that corresponds to snapshot_id
+
+        Raises:
+            ValueError: If the snapshot cannot be found
+        """
         try:
             return next(snapshot for snapshot in self.metadata.snapshots if 
snapshot.snapshot_id == snapshot_id)
-        except StopIteration:
-            return None
+        except StopIteration as e:
+            raise ValueError(f"Snapshot id not found in table: {snapshot_id}") 
from e

Review Comment:
   The `next(...)` pattern is more efficient as it will stop the iteration once 
the snapshot has been found. I'm also happy to materialize it as a `dict` that 
we cache.



##########
python/pyiceberg/table/snapshots.py:
##########
@@ -117,12 +125,369 @@ def manifests(self, io: FileIO) -> List[ManifestFile]:
             return list(read_manifest_list(file))
         return []
 
+    def added_data_files(self, io: FileIO) -> Generator[DataFile, None, None]:
+        for manifest in self.manifests(io):
+            yield from [entry.data_file for entry in 
manifest.fetch_manifest_entry(io)]
+
 
 class MetadataLogEntry(IcebergBaseModel):
     metadata_file: str = Field(alias="metadata-file")
     timestamp_ms: int = Field(alias="timestamp-ms")
 
 
 class SnapshotLogEntry(IcebergBaseModel):
-    snapshot_id: str = Field(alias="snapshot-id")
+    snapshot_id: int = Field(alias="snapshot-id")
     timestamp_ms: int = Field(alias="timestamp-ms")
+
+
+def is_ancestor_of(table: "Table", snapshot_id: int, ancestor_snapshot_id: 
int) -> bool:

Review Comment:
   This is to avoid circular dependencies. We only import Table when doing type 
checking:
   ```python
   if TYPE_CHECKING:
       from pyiceberg.table import Table
   ```
   The Table has snapshots, and the snapshot refers back to the table. By 
putting this in quotes, we still get type-checking, but it doesn't introduce a 
circular dependency.
   
   



##########
python/pyiceberg/table/snapshots.py:
##########
@@ -117,12 +125,369 @@ def manifests(self, io: FileIO) -> List[ManifestFile]:
             return list(read_manifest_list(file))
         return []
 
+    def added_data_files(self, io: FileIO) -> Generator[DataFile, None, None]:
+        for manifest in self.manifests(io):
+            yield from [entry.data_file for entry in 
manifest.fetch_manifest_entry(io)]
+
 
 class MetadataLogEntry(IcebergBaseModel):
     metadata_file: str = Field(alias="metadata-file")
     timestamp_ms: int = Field(alias="timestamp-ms")
 
 
 class SnapshotLogEntry(IcebergBaseModel):
-    snapshot_id: str = Field(alias="snapshot-id")
+    snapshot_id: int = Field(alias="snapshot-id")
     timestamp_ms: int = Field(alias="timestamp-ms")
+
+
+def is_ancestor_of(table: "Table", snapshot_id: int, ancestor_snapshot_id: 
int) -> bool:
+    """
+    Returns whether ancestor_snapshot_id is an ancestor of snapshot_id using 
the given lookup function.
+
+    Args:
+        table: The table
+        snapshot_id: The snapshot id of the snapshot
+        ancestor_snapshot_id: The snapshot id of the possible ancestor
+
+    Returns:
+        True if it is an ancestor or not
+    """
+    snapshots = ancestors_of(snapshot_id, table.snapshot_by_id)
+    for snapshot in snapshots:
+        if snapshot.snapshot_id == ancestor_snapshot_id:
+            return True
+    return False
+
+
+def is_parent_ancestor_of(table: "Table", snapshot_id: int, 
ancestor_parent_snapshot_id: int) -> bool:
+    """
+    Returns whether some ancestor of snapshot_id has parent_id matches 
ancestor_parent_snapshot_id
+
+    Args:
+        table: The table
+        snapshot_id: The snapshot id of the snapshot
+        ancestor_parent_snapshot_id: The snapshot id of the possible parent 
ancestor
+
+    Returns:
+        True if there is an ancestor with a parent
+    """
+    snapshots = ancestors_of(snapshot_id, table.snapshot_by_id)
+    for snapshot in snapshots:
+        if snapshot.parent_snapshot_id == ancestor_parent_snapshot_id:
+            return True
+    return False
+
+
+def current_ancestors(table: "Table") -> Iterable[Snapshot]:
+    """
+    Returns an iterable that traverses the table's snapshots from the current 
to the last known ancestor.
+
+    Args:
+        table: The table
+
+    Returns:
+        An iterable of all the ancestors
+    """
+    if current_snapshot := table.current_snapshot():
+        return ancestors_of(current_snapshot, table.snapshot_by_id)
+    return []
+
+
+def current_ancestor_ids(table: "Table") -> Iterable[int]:
+    """
+    Return the snapshot IDs for the ancestors of the current table state.
+
+    Ancestor IDs are ordered by commit time, descending. The first ID is
+    the current snapshot, followed by its parent, and so on.
+
+    Args:
+        table: The table
+
+    Returns:
+        An iterable of all the snapshot IDs
+    """
+    if current_snapshot := table.current_snapshot():
+        return ancestor_ids(current_snapshot, table.snapshot_by_id)
+    return []
+
+
+def oldest_ancestor(table: "Table") -> Optional[Snapshot]:
+    """
+    Traverses the history of the table's current snapshot and finds the oldest 
Snapshot.
+
+    Args:
+        table: The table
+
+    Returns:
+        None if there is no current snapshot in the table, else the oldest 
Snapshot.
+    """
+    oldest_snapshot: Optional[Snapshot] = None
+
+    for snapshot in current_ancestors(table):
+        oldest_snapshot = snapshot
+
+    return oldest_snapshot
+
+
+def oldest_ancestor_of(table: "Table", snapshot_id: int) -> Optional[Snapshot]:
+    """
+    Traverses the history and finds the oldest ancestor of the specified 
snapshot.
+
+    Oldest ancestor is defined as the ancestor snapshot whose parent is null 
or has been
+    expired. If the specified snapshot has no parent or parent has been 
expired, the specified
+    snapshot itself is returned.
+
+    Args:
+        table: The table
+        snapshot_id: the ID of the snapshot to find the oldest ancestor
+
+    Returns:
+        None if there is no current snapshot in the table, else the oldest 
Snapshot.
+    """
+    oldest_snapshot: Optional[Snapshot] = None
+
+    for snapshot in ancestors_of(snapshot_id, table.snapshot_by_id):
+        oldest_snapshot = snapshot
+
+    return oldest_snapshot
+
+
+def oldest_ancestor_after(table: "Table", timestamp_ms: int) -> Snapshot:
+    """
+    Looks up the snapshot after a given point in time
+
+    Args:
+        table: The table
+        timestamp_ms: The timestamp in millis since the Unix epoch
+
+    Returns:
+        The snapshot after the given point in time
+
+    Raises:
+        ValueError: When there is no snapshot older than the given time
+    """
+    if last_snapshot := table.current_snapshot():
+        for snapshot in current_ancestors(table):
+            if snapshot.timestamp_ms < timestamp_ms:
+                return last_snapshot
+            elif snapshot.timestamp_ms == timestamp_ms:
+                return snapshot
+
+            last_snapshot = snapshot
+
+        if last_snapshot is not None and last_snapshot.parent_snapshot_id is 
None:
+            return last_snapshot
+
+    raise ValueError(f"Cannot find snapshot older than: {timestamp_ms}")
+
+
+def snapshots_ids_between(table: "Table", from_snapshot_id: int, 
to_snapshot_id: int) -> Iterable[int]:
+    """
+    Returns list of snapshot ids in the range - (fromSnapshotId, toSnapshotId]
+
+    This method assumes that fromSnapshotId is an ancestor of toSnapshotId.
+
+    Args:
+        table: The table
+        from_snapshot_id: The starting snapshot ID
+        to_snapshot_id: The ending snapshot ID
+
+    Returns:
+        The list of snapshot IDs that are between the given snapshot IDs
+    """
+
+    def lookup(snapshot_id: int) -> Optional[Snapshot]:
+        return table.snapshot_by_id(snapshot_id) if snapshot_id != 
from_snapshot_id else None
+
+    return ancestor_ids(table.snapshot_by_id(snapshot_id=to_snapshot_id), 
lookup)
+
+
+def ancestor_ids(latest_snapshot: Union[int, Snapshot], lookup: 
Callable[[int], Optional[Snapshot]]) -> Iterable[int]:
+    """
+    Returns list of the snapshot IDs of the ancestors
+
+    Args:
+        latest_snapshot: The snapshot where to start from
+        lookup: Lookup function to get the snapshot for the snapshot ID
+
+    Returns:
+        The list of snapshot IDs that are ancestor of the given snapshot
+    """
+
+    def get_id(snapshot: Snapshot) -> int:
+        return snapshot.snapshot_id
+
+    return map(get_id, ancestors_of(latest_snapshot, lookup))
+
+
+def ancestors_of(latest_snapshot: Union[int, Snapshot], lookup: 
Callable[[int], Optional[Snapshot]]) -> Iterable[Snapshot]:
+    """
+    Returns list of snapshot that are ancestor of the given snapshot
+
+    Args:
+        latest_snapshot: The snapshot where to start from
+        lookup: Lookup function to get the snapshot for the snapshot ID
+
+    Returns:
+        The list of snapshots that are ancestor of the given snapshot
+    """
+    if isinstance(latest_snapshot, int):
+        start = lookup(latest_snapshot)
+        if start is None:
+            raise ValueError(f"Cannot find snapshot: {latest_snapshot}")
+    else:
+        start = latest_snapshot
+
+    def snapshot_generator() -> Generator[Snapshot, None, None]:

Review Comment:
   Idk, yielding directly in the function makes more sense indeed.



##########
python/pyiceberg/table/snapshots.py:
##########
@@ -117,12 +125,369 @@ def manifests(self, io: FileIO) -> List[ManifestFile]:
             return list(read_manifest_list(file))
         return []
 
+    def added_data_files(self, io: FileIO) -> Generator[DataFile, None, None]:
+        for manifest in self.manifests(io):

Review Comment:
   Ah, great catch!



##########
python/pyiceberg/table/snapshots.py:
##########
@@ -117,12 +125,369 @@ def manifests(self, io: FileIO) -> List[ManifestFile]:
             return list(read_manifest_list(file))
         return []
 
+    def added_data_files(self, io: FileIO) -> Generator[DataFile, None, None]:
+        for manifest in self.manifests(io):
+            yield from [entry.data_file for entry in 
manifest.fetch_manifest_entry(io)]
+
 
 class MetadataLogEntry(IcebergBaseModel):
     metadata_file: str = Field(alias="metadata-file")
     timestamp_ms: int = Field(alias="timestamp-ms")
 
 
 class SnapshotLogEntry(IcebergBaseModel):
-    snapshot_id: str = Field(alias="snapshot-id")
+    snapshot_id: int = Field(alias="snapshot-id")
     timestamp_ms: int = Field(alias="timestamp-ms")
+
+
+def is_ancestor_of(table: "Table", snapshot_id: int, ancestor_snapshot_id: 
int) -> bool:
+    """
+    Returns whether ancestor_snapshot_id is an ancestor of snapshot_id using 
the given lookup function.
+
+    Args:
+        table: The table
+        snapshot_id: The snapshot id of the snapshot
+        ancestor_snapshot_id: The snapshot id of the possible ancestor
+
+    Returns:
+        True if it is an ancestor or not
+    """
+    snapshots = ancestors_of(snapshot_id, table.snapshot_by_id)
+    for snapshot in snapshots:
+        if snapshot.snapshot_id == ancestor_snapshot_id:
+            return True
+    return False
+
+
+def is_parent_ancestor_of(table: "Table", snapshot_id: int, 
ancestor_parent_snapshot_id: int) -> bool:
+    """
+    Returns whether some ancestor of snapshot_id has parent_id matches 
ancestor_parent_snapshot_id
+
+    Args:
+        table: The table
+        snapshot_id: The snapshot id of the snapshot
+        ancestor_parent_snapshot_id: The snapshot id of the possible parent 
ancestor
+
+    Returns:
+        True if there is an ancestor with a parent
+    """
+    snapshots = ancestors_of(snapshot_id, table.snapshot_by_id)
+    for snapshot in snapshots:
+        if snapshot.parent_snapshot_id == ancestor_parent_snapshot_id:
+            return True
+    return False
+
+
+def current_ancestors(table: "Table") -> Iterable[Snapshot]:
+    """
+    Returns an iterable that traverses the table's snapshots from the current 
to the last known ancestor.
+
+    Args:
+        table: The table
+
+    Returns:
+        An iterable of all the ancestors
+    """
+    if current_snapshot := table.current_snapshot():
+        return ancestors_of(current_snapshot, table.snapshot_by_id)
+    return []
+
+
+def current_ancestor_ids(table: "Table") -> Iterable[int]:
+    """
+    Return the snapshot IDs for the ancestors of the current table state.
+
+    Ancestor IDs are ordered by commit time, descending. The first ID is
+    the current snapshot, followed by its parent, and so on.
+
+    Args:
+        table: The table
+
+    Returns:
+        An iterable of all the snapshot IDs
+    """
+    if current_snapshot := table.current_snapshot():
+        return ancestor_ids(current_snapshot, table.snapshot_by_id)
+    return []
+
+
+def oldest_ancestor(table: "Table") -> Optional[Snapshot]:
+    """
+    Traverses the history of the table's current snapshot and finds the oldest 
Snapshot.
+
+    Args:
+        table: The table
+
+    Returns:
+        None if there is no current snapshot in the table, else the oldest 
Snapshot.
+    """
+    oldest_snapshot: Optional[Snapshot] = None
+
+    for snapshot in current_ancestors(table):
+        oldest_snapshot = snapshot
+
+    return oldest_snapshot
+
+
+def oldest_ancestor_of(table: "Table", snapshot_id: int) -> Optional[Snapshot]:
+    """
+    Traverses the history and finds the oldest ancestor of the specified 
snapshot.
+
+    Oldest ancestor is defined as the ancestor snapshot whose parent is null 
or has been
+    expired. If the specified snapshot has no parent or parent has been 
expired, the specified
+    snapshot itself is returned.
+
+    Args:
+        table: The table
+        snapshot_id: the ID of the snapshot to find the oldest ancestor
+
+    Returns:
+        None if there is no current snapshot in the table, else the oldest 
Snapshot.
+    """
+    oldest_snapshot: Optional[Snapshot] = None
+
+    for snapshot in ancestors_of(snapshot_id, table.snapshot_by_id):
+        oldest_snapshot = snapshot
+
+    return oldest_snapshot
+
+
+def oldest_ancestor_after(table: "Table", timestamp_ms: int) -> Snapshot:
+    """
+    Looks up the snapshot after a given point in time
+
+    Args:
+        table: The table
+        timestamp_ms: The timestamp in millis since the Unix epoch
+
+    Returns:
+        The snapshot after the given point in time
+
+    Raises:
+        ValueError: When there is no snapshot older than the given time
+    """
+    if last_snapshot := table.current_snapshot():
+        for snapshot in current_ancestors(table):
+            if snapshot.timestamp_ms < timestamp_ms:
+                return last_snapshot
+            elif snapshot.timestamp_ms == timestamp_ms:
+                return snapshot
+
+            last_snapshot = snapshot
+
+        if last_snapshot is not None and last_snapshot.parent_snapshot_id is 
None:
+            return last_snapshot
+
+    raise ValueError(f"Cannot find snapshot older than: {timestamp_ms}")
+
+
+def snapshots_ids_between(table: "Table", from_snapshot_id: int, 
to_snapshot_id: int) -> Iterable[int]:
+    """
+    Returns list of snapshot ids in the range - (fromSnapshotId, toSnapshotId]
+
+    This method assumes that fromSnapshotId is an ancestor of toSnapshotId.
+
+    Args:
+        table: The table
+        from_snapshot_id: The starting snapshot ID
+        to_snapshot_id: The ending snapshot ID
+
+    Returns:
+        The list of snapshot IDs that are between the given snapshot IDs
+    """
+
+    def lookup(snapshot_id: int) -> Optional[Snapshot]:
+        return table.snapshot_by_id(snapshot_id) if snapshot_id != 
from_snapshot_id else None
+
+    return ancestor_ids(table.snapshot_by_id(snapshot_id=to_snapshot_id), 
lookup)
+
+
+def ancestor_ids(latest_snapshot: Union[int, Snapshot], lookup: 
Callable[[int], Optional[Snapshot]]) -> Iterable[int]:
+    """
+    Returns list of the snapshot IDs of the ancestors
+
+    Args:
+        latest_snapshot: The snapshot where to start from
+        lookup: Lookup function to get the snapshot for the snapshot ID
+
+    Returns:
+        The list of snapshot IDs that are ancestor of the given snapshot
+    """
+
+    def get_id(snapshot: Snapshot) -> int:
+        return snapshot.snapshot_id
+
+    return map(get_id, ancestors_of(latest_snapshot, lookup))
+
+
+def ancestors_of(latest_snapshot: Union[int, Snapshot], lookup: 
Callable[[int], Optional[Snapshot]]) -> Iterable[Snapshot]:
+    """
+    Returns list of snapshot that are ancestor of the given snapshot
+
+    Args:
+        latest_snapshot: The snapshot where to start from
+        lookup: Lookup function to get the snapshot for the snapshot ID
+
+    Returns:
+        The list of snapshots that are ancestor of the given snapshot
+    """
+    if isinstance(latest_snapshot, int):
+        start = lookup(latest_snapshot)
+        if start is None:
+            raise ValueError(f"Cannot find snapshot: {latest_snapshot}")

Review Comment:
   It can also be a wrapper function around `table.snapshot_by_id`: 
https://github.com/apache/iceberg/pull/6933/files#diff-4b189d9831bdf4c6d55c019998dd96ea7a4cd1313d2c725f582d862a8fbe0e0dR373-R374
   
   But I agree that it is nicer to keep everything in line. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to