This is an automated email from the ASF dual-hosted git repository.

honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 9cb3cd5f Metadata Log Entries metadata table (#667)
9cb3cd5f is described below

commit 9cb3cd5f1192b3cc008138aea5eff02550c65146
Author: Kevin Liu <[email protected]>
AuthorDate: Wed Jun 26 12:06:23 2024 -0700

    Metadata Log Entries metadata table (#667)
---
 mkdocs/docs/api.md                      | 23 +++++++++++++++++++
 pyiceberg/table/__init__.py             | 34 ++++++++++++++++++++++++++++
 pyiceberg/table/metadata.py             |  7 ++++++
 pyiceberg/table/snapshots.py            |  4 +++-
 tests/integration/test_inspect_table.py | 40 +++++++++++++++++++++++++++++++++
 tests/table/test_snapshots.py           |  1 +
 6 files changed, 108 insertions(+), 1 deletion(-)

diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 54f4a20c..14c72595 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -656,6 +656,29 @@ partition_summaries: [[    -- is_valid: all not null
 ["test"]]]
 ```
 
+### Metadata Log Entries
+
+To show table metadata log entries:
+
+```python
+table.inspect.metadata_log_entries()
+```
+
+```
+pyarrow.Table
+timestamp: timestamp[ms] not null
+file: string not null
+latest_snapshot_id: int64
+latest_schema_id: int32
+latest_sequence_number: int64
+----
+timestamp: [[2024-04-28 17:03:00.214,2024-04-28 17:03:00.352,2024-04-28 
17:03:00.445,2024-04-28 17:03:00.498]]
+file: 
[["s3://warehouse/default/table_metadata_log_entries/metadata/00000-0b3b643b-0f3a-4787-83ad-601ba57b7319.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00001-f74e4b2c-0f89-4f55-822d-23d099fd7d54.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00002-97e31507-e4d9-4438-aff1-3c0c5304d271.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00003-6c8b7033-6ad8-4fe4-b64d-d70381aeaddc.metadata.json"]]
+latest_snapshot_id: 
[[null,3958871664825505738,1289234307021405706,7640277914614648349]]
+latest_schema_id: [[null,0,0,0]]
+latest_sequence_number: [[null,0,0,0]]
+```
+
 ## Add Files
 
 Expert Iceberg users may choose to commit existing parquet files to the 
Iceberg table as data files, without rewriting them.
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index c78e005c..dced94de 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -3845,6 +3845,40 @@ class InspectTable:
             schema=manifest_schema,
         )
 
+    def metadata_log_entries(self) -> "pa.Table":
+        import pyarrow as pa
+
+        from pyiceberg.table.snapshots import MetadataLogEntry
+
+        table_schema = pa.schema([
+            pa.field("timestamp", pa.timestamp(unit="ms"), nullable=False),
+            pa.field("file", pa.string(), nullable=False),
+            pa.field("latest_snapshot_id", pa.int64(), nullable=True),
+            pa.field("latest_schema_id", pa.int32(), nullable=True),
+            pa.field("latest_sequence_number", pa.int64(), nullable=True),
+        ])
+
+        def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> 
Dict[str, Any]:
+            latest_snapshot = 
self.tbl.snapshot_as_of_timestamp(metadata_entry.timestamp_ms)
+            return {
+                "timestamp": metadata_entry.timestamp_ms,
+                "file": metadata_entry.metadata_file,
+                "latest_snapshot_id": latest_snapshot.snapshot_id if 
latest_snapshot else None,
+                "latest_schema_id": latest_snapshot.schema_id if 
latest_snapshot else None,
+                "latest_sequence_number": latest_snapshot.sequence_number if 
latest_snapshot else None,
+            }
+
+        # similar to MetadataLogEntriesTable in Java
+        # 
https://github.com/apache/iceberg/blob/8a70fe0ff5f241aec8856f8091c77fdce35ad256/core/src/main/java/org/apache/iceberg/MetadataLogEntriesTable.java#L62-L66
+        metadata_log_entries = self.tbl.metadata.metadata_log + [
+            MetadataLogEntry(metadata_file=self.tbl.metadata_location, 
timestamp_ms=self.tbl.metadata.last_updated_ms)
+        ]
+
+        return pa.Table.from_pylist(
+            [metadata_log_entry_to_row(entry) for entry in 
metadata_log_entries],
+            schema=table_schema,
+        )
+
 
 @dataclass(frozen=True)
 class TablePartition:
diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py
index 8c3c3893..1fea3301 100644
--- a/pyiceberg/table/metadata.py
+++ b/pyiceberg/table/metadata.py
@@ -311,6 +311,13 @@ class TableMetadataCommonFields(IcebergBaseModel):
             return -1
         return current_snapshot_id
 
+    @field_serializer("snapshots")
+    def serialize_snapshots(self, snapshots: List[Snapshot]) -> List[Snapshot]:
+        # Snapshot field `sequence-number` should not be written for v1 
metadata
+        if self.format_version == 1:
+            return [snapshot.model_copy(update={"sequence_number": None}) for 
snapshot in snapshots]
+        return snapshots
+
 
 def _generate_snapshot_id() -> int:
     """Generate a new Snapshot ID from a UUID.
diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py
index d6a3ff16..842d4252 100644
--- a/pyiceberg/table/snapshots.py
+++ b/pyiceberg/table/snapshots.py
@@ -58,6 +58,8 @@ CHANGED_PARTITION_COUNT_PROP = "changed-partition-count"
 CHANGED_PARTITION_PREFIX = "partitions."
 OPERATION = "operation"
 
+INITIAL_SEQUENCE_NUMBER = 0
+
 
 class Operation(Enum):
     """Describes the operation.
@@ -231,7 +233,7 @@ class Summary(IcebergBaseModel, Mapping[str, str]):
 class Snapshot(IcebergBaseModel):
     snapshot_id: int = Field(alias="snapshot-id")
     parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id", 
default=None)
-    sequence_number: Optional[int] = Field(alias="sequence-number", 
default=None)
+    sequence_number: Optional[int] = Field(alias="sequence-number", 
default=INITIAL_SEQUENCE_NUMBER)
     timestamp_ms: int = Field(alias="timestamp-ms", default_factory=lambda: 
int(time.time() * 1000))
     manifest_list: Optional[str] = Field(
         alias="manifest-list", description="Location of the snapshot's 
manifest list file", default=None
diff --git a/tests/integration/test_inspect_table.py 
b/tests/integration/test_inspect_table.py
index 1f2b9a3e..2840fb0b 100644
--- a/tests/integration/test_inspect_table.py
+++ b/tests/integration/test_inspect_table.py
@@ -528,3 +528,43 @@ def test_inspect_manifests(spark: SparkSession, 
session_catalog: Catalog, format
     for column in df.column_names:
         for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
             assert left == right, f"Difference in column {column}: {left} != 
{right}"
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_inspect_metadata_log_entries(
+    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: 
pa.Table, format_version: int
+) -> None:
+    from pandas.testing import assert_frame_equal
+
+    identifier = "default.table_metadata_log_entries"
+    try:
+        session_catalog.drop_table(identifier=identifier)
+    except NoSuchTableError:
+        pass
+
+    tbl = _create_table(session_catalog, identifier, 
properties={"format-version": format_version})
+
+    # Write some data
+    tbl.append(arrow_table_with_null)
+    tbl.append(arrow_table_with_null)
+    tbl.append(arrow_table_with_null)
+
+    df = tbl.inspect.metadata_log_entries()
+    spark_df = spark.sql(f"SELECT * FROM {identifier}.metadata_log_entries")
+    lhs = df.to_pandas()
+    rhs = spark_df.toPandas()
+
+    # Timestamp in the last row of `metadata_log_entries` table is based on 
when the table was read
+    # Therefore, the timestamp of the last row for pyiceberg dataframe and 
spark dataframe will be different
+    left_before_last, left_last = lhs[:-1], lhs[-1:]
+    right_before_last, right_last = rhs[:-1], rhs[-1:]
+
+    # compare all rows except for the last row
+    assert_frame_equal(left_before_last, right_before_last, check_dtype=False)
+    # compare the last row, except for the timestamp
+    for column in df.column_names:
+        for left, right in zip(left_last[column], right_last[column]):
+            if column == "timestamp":
+                continue
+            assert left == right, f"Difference in column {column}: {left} != 
{right}"
diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py
index 2569a11d..fa346405 100644
--- a/tests/table/test_snapshots.py
+++ b/tests/table/test_snapshots.py
@@ -77,6 +77,7 @@ def test_serialize_snapshot_without_sequence_number() -> None:
     snapshot = Snapshot(
         snapshot_id=25,
         parent_snapshot_id=19,
+        sequence_number=None,
         timestamp_ms=1602638573590,
         manifest_list="s3:/a/b/c.avro",
         summary=Summary(Operation.APPEND),

Reply via email to