This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 0e381fa2 Add history inspect table (#828)
0e381fa2 is described below
commit 0e381fa28bb7c18dc6ba5dc46bedf55d77057e7f
Author: Andre Luis Anastacio <[email protected]>
AuthorDate: Wed Jun 26 18:37:39 2024 -0300
Add history inspect table (#828)
---
mkdocs/docs/api.md | 21 ++++++++++
pyiceberg/table/__init__.py | 28 +++++++++++++
tests/integration/test_inspect_table.py | 69 +++++++++++++++++++++++++++++++++
3 files changed, 118 insertions(+)
diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 14c72595..6da2fc3a 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -679,6 +679,27 @@ latest_schema_id: [[null,0,0,0]]
latest_sequence_number: [[null,0,0,0]]
```
+### History
+
+To show a table's history:
+
+```python
+table.inspect.history()
+```
+
+```
+pyarrow.Table
+made_current_at: timestamp[ms] not null
+snapshot_id: int64 not null
+parent_id: int64
+is_current_ancestor: bool not null
+----
+made_current_at: [[2024-06-18 16:17:48.768,2024-06-18 16:17:49.240,2024-06-18
16:17:49.343,2024-06-18 16:17:49.511]]
+snapshot_id:
[[4358109269873137077,3380769165026943338,4358109269873137077,3089420140651211776]]
+parent_id: [[null,4358109269873137077,null,4358109269873137077]]
+is_current_ancestor: [[true,false,true,true]]
+```
+
## Add Files
Expert Iceberg users may choose to commit existing parquet files to the
Iceberg table as data files, without rewriting them.
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index dced94de..8c149397 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -113,6 +113,7 @@ from pyiceberg.table.snapshots import (
SnapshotLogEntry,
SnapshotSummaryCollector,
Summary,
+ ancestors_of,
update_snapshot_summaries,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -3879,6 +3880,33 @@ class InspectTable:
schema=table_schema,
)
+ def history(self) -> "pa.Table":
+ import pyarrow as pa
+
+ history_schema = pa.schema([
+ pa.field("made_current_at", pa.timestamp(unit="ms"),
nullable=False),
+ pa.field("snapshot_id", pa.int64(), nullable=False),
+ pa.field("parent_id", pa.int64(), nullable=True),
+ pa.field("is_current_ancestor", pa.bool_(), nullable=False),
+ ])
+
+ ancestors_ids = {snapshot.snapshot_id for snapshot in
ancestors_of(self.tbl.current_snapshot(), self.tbl.metadata)}
+
+ history = []
+ metadata = self.tbl.metadata
+
+ for snapshot_entry in metadata.snapshot_log:
+ snapshot = metadata.snapshot_by_id(snapshot_entry.snapshot_id)
+
+ history.append({
+ "made_current_at":
datetime.utcfromtimestamp(snapshot_entry.timestamp_ms / 1000.0),
+ "snapshot_id": snapshot_entry.snapshot_id,
+ "parent_id": snapshot.parent_snapshot_id if snapshot else None,
+ "is_current_ancestor": snapshot_entry.snapshot_id in
ancestors_ids,
+ })
+
+ return pa.Table.from_pylist(history, schema=history_schema)
+
@dataclass(frozen=True)
class TablePartition:
diff --git a/tests/integration/test_inspect_table.py
b/tests/integration/test_inspect_table.py
index 2840fb0b..8414fba3 100644
--- a/tests/integration/test_inspect_table.py
+++ b/tests/integration/test_inspect_table.py
@@ -568,3 +568,72 @@ def test_inspect_metadata_log_entries(
if column == "timestamp":
continue
assert left == right, f"Difference in column {column}: {left} !=
{right}"
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_inspect_history(spark: SparkSession, session_catalog: Catalog,
format_version: int) -> None:
+ identifier = "default.table_history"
+
+ try:
+ session_catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ spark.sql(
+ f"""
+ CREATE TABLE {identifier} (
+ id int,
+ data string
+ )
+ PARTITIONED BY (data)
+ """
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO {identifier} VALUES (1, "a")
+ """
+ )
+
+ table = session_catalog.load_table(identifier)
+ first_snapshot = table.current_snapshot()
+ snapshot_id = None if not first_snapshot else first_snapshot.snapshot_id
+
+ spark.sql(
+ f"""
+ INSERT INTO {identifier} VALUES (2, "b")
+ """
+ )
+
+ spark.sql(
+ f"""
+ CALL integration.system.rollback_to_snapshot('{identifier}',
{snapshot_id})
+ """
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO {identifier} VALUES (3, "c")
+ """
+ )
+
+ table.refresh()
+
+ df = table.inspect.history()
+
+ assert df.column_names == [
+ "made_current_at",
+ "snapshot_id",
+ "parent_id",
+ "is_current_ancestor",
+ ]
+
+ lhs = spark.table(f"{identifier}.history").toPandas()
+ rhs = df.to_pandas()
+ for column in df.column_names:
+ for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
+ if isinstance(left, float) and math.isnan(left) and
isinstance(right, float) and math.isnan(right):
+ # NaN != NaN in Python
+ continue
+ assert left == right, f"Difference in column {column}: {left} !=
{right}"