This is an automated email from the ASF dual-hosted git repository.
honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 9cb3cd5f Metadata Log Entries metadata table (#667)
9cb3cd5f is described below
commit 9cb3cd5f1192b3cc008138aea5eff02550c65146
Author: Kevin Liu <[email protected]>
AuthorDate: Wed Jun 26 12:06:23 2024 -0700
Metadata Log Entries metadata table (#667)
---
mkdocs/docs/api.md | 23 +++++++++++++++++++
pyiceberg/table/__init__.py | 34 ++++++++++++++++++++++++++++
pyiceberg/table/metadata.py | 7 ++++++
pyiceberg/table/snapshots.py | 4 +++-
tests/integration/test_inspect_table.py | 40 +++++++++++++++++++++++++++++++++
tests/table/test_snapshots.py | 1 +
6 files changed, 108 insertions(+), 1 deletion(-)
diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 54f4a20c..14c72595 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -656,6 +656,29 @@ partition_summaries: [[ -- is_valid: all not null
["test"]]]
```
+### Metadata Log Entries
+
+To show table metadata log entries:
+
+```python
+table.inspect.metadata_log_entries()
+```
+
+```
+pyarrow.Table
+timestamp: timestamp[ms] not null
+file: string not null
+latest_snapshot_id: int64
+latest_schema_id: int32
+latest_sequence_number: int64
+----
+timestamp: [[2024-04-28 17:03:00.214,2024-04-28 17:03:00.352,2024-04-28
17:03:00.445,2024-04-28 17:03:00.498]]
+file:
[["s3://warehouse/default/table_metadata_log_entries/metadata/00000-0b3b643b-0f3a-4787-83ad-601ba57b7319.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00001-f74e4b2c-0f89-4f55-822d-23d099fd7d54.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00002-97e31507-e4d9-4438-aff1-3c0c5304d271.metadata.json","s3://warehouse/default/table_metadata_log_entries/metadata/00003-6c8b7033-6ad8-4fe4-b64d-d70381aeaddc.metadata.json"]]
+latest_snapshot_id:
[[null,3958871664825505738,1289234307021405706,7640277914614648349]]
+latest_schema_id: [[null,0,0,0]]
+latest_sequence_number: [[null,0,0,0]]
+```
+
## Add Files
Expert Iceberg users may choose to commit existing parquet files to the
Iceberg table as data files, without rewriting them.
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index c78e005c..dced94de 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -3845,6 +3845,40 @@ class InspectTable:
schema=manifest_schema,
)
+ def metadata_log_entries(self) -> "pa.Table":
+ import pyarrow as pa
+
+ from pyiceberg.table.snapshots import MetadataLogEntry
+
+ table_schema = pa.schema([
+ pa.field("timestamp", pa.timestamp(unit="ms"), nullable=False),
+ pa.field("file", pa.string(), nullable=False),
+ pa.field("latest_snapshot_id", pa.int64(), nullable=True),
+ pa.field("latest_schema_id", pa.int32(), nullable=True),
+ pa.field("latest_sequence_number", pa.int64(), nullable=True),
+ ])
+
+ def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) ->
Dict[str, Any]:
+ latest_snapshot =
self.tbl.snapshot_as_of_timestamp(metadata_entry.timestamp_ms)
+ return {
+ "timestamp": metadata_entry.timestamp_ms,
+ "file": metadata_entry.metadata_file,
+ "latest_snapshot_id": latest_snapshot.snapshot_id if
latest_snapshot else None,
+ "latest_schema_id": latest_snapshot.schema_id if
latest_snapshot else None,
+ "latest_sequence_number": latest_snapshot.sequence_number if
latest_snapshot else None,
+ }
+
+ # similar to MetadataLogEntriesTable in Java
+ #
https://github.com/apache/iceberg/blob/8a70fe0ff5f241aec8856f8091c77fdce35ad256/core/src/main/java/org/apache/iceberg/MetadataLogEntriesTable.java#L62-L66
+ metadata_log_entries = self.tbl.metadata.metadata_log + [
+ MetadataLogEntry(metadata_file=self.tbl.metadata_location,
timestamp_ms=self.tbl.metadata.last_updated_ms)
+ ]
+
+ return pa.Table.from_pylist(
+ [metadata_log_entry_to_row(entry) for entry in
metadata_log_entries],
+ schema=table_schema,
+ )
+
@dataclass(frozen=True)
class TablePartition:
diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py
index 8c3c3893..1fea3301 100644
--- a/pyiceberg/table/metadata.py
+++ b/pyiceberg/table/metadata.py
@@ -311,6 +311,13 @@ class TableMetadataCommonFields(IcebergBaseModel):
return -1
return current_snapshot_id
+ @field_serializer("snapshots")
+ def serialize_snapshots(self, snapshots: List[Snapshot]) -> List[Snapshot]:
+ # Snapshot field `sequence-number` should not be written for v1
metadata
+ if self.format_version == 1:
+ return [snapshot.model_copy(update={"sequence_number": None}) for
snapshot in snapshots]
+ return snapshots
+
def _generate_snapshot_id() -> int:
"""Generate a new Snapshot ID from a UUID.
diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py
index d6a3ff16..842d4252 100644
--- a/pyiceberg/table/snapshots.py
+++ b/pyiceberg/table/snapshots.py
@@ -58,6 +58,8 @@ CHANGED_PARTITION_COUNT_PROP = "changed-partition-count"
CHANGED_PARTITION_PREFIX = "partitions."
OPERATION = "operation"
+INITIAL_SEQUENCE_NUMBER = 0
+
class Operation(Enum):
"""Describes the operation.
@@ -231,7 +233,7 @@ class Summary(IcebergBaseModel, Mapping[str, str]):
class Snapshot(IcebergBaseModel):
snapshot_id: int = Field(alias="snapshot-id")
parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id",
default=None)
- sequence_number: Optional[int] = Field(alias="sequence-number",
default=None)
+ sequence_number: Optional[int] = Field(alias="sequence-number",
default=INITIAL_SEQUENCE_NUMBER)
timestamp_ms: int = Field(alias="timestamp-ms", default_factory=lambda:
int(time.time() * 1000))
manifest_list: Optional[str] = Field(
alias="manifest-list", description="Location of the snapshot's
manifest list file", default=None
diff --git a/tests/integration/test_inspect_table.py
b/tests/integration/test_inspect_table.py
index 1f2b9a3e..2840fb0b 100644
--- a/tests/integration/test_inspect_table.py
+++ b/tests/integration/test_inspect_table.py
@@ -528,3 +528,43 @@ def test_inspect_manifests(spark: SparkSession,
session_catalog: Catalog, format
for column in df.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
assert left == right, f"Difference in column {column}: {left} !=
{right}"
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_inspect_metadata_log_entries(
+ spark: SparkSession, session_catalog: Catalog, arrow_table_with_null:
pa.Table, format_version: int
+) -> None:
+ from pandas.testing import assert_frame_equal
+
+ identifier = "default.table_metadata_log_entries"
+ try:
+ session_catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ tbl = _create_table(session_catalog, identifier,
properties={"format-version": format_version})
+
+ # Write some data
+ tbl.append(arrow_table_with_null)
+ tbl.append(arrow_table_with_null)
+ tbl.append(arrow_table_with_null)
+
+ df = tbl.inspect.metadata_log_entries()
+ spark_df = spark.sql(f"SELECT * FROM {identifier}.metadata_log_entries")
+ lhs = df.to_pandas()
+ rhs = spark_df.toPandas()
+
+ # Timestamp in the last row of `metadata_log_entries` table is based on
when the table was read
+ # Therefore, the timestamp of the last row for pyiceberg dataframe and
spark dataframe will be different
+ left_before_last, left_last = lhs[:-1], lhs[-1:]
+ right_before_last, right_last = rhs[:-1], rhs[-1:]
+
+ # compare all rows except for the last row
+ assert_frame_equal(left_before_last, right_before_last, check_dtype=False)
+ # compare the last row, except for the timestamp
+ for column in df.column_names:
+ for left, right in zip(left_last[column], right_last[column]):
+ if column == "timestamp":
+ continue
+ assert left == right, f"Difference in column {column}: {left} !=
{right}"
diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py
index 2569a11d..fa346405 100644
--- a/tests/table/test_snapshots.py
+++ b/tests/table/test_snapshots.py
@@ -77,6 +77,7 @@ def test_serialize_snapshot_without_sequence_number() -> None:
snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
+ sequence_number=None,
timestamp_ms=1602638573590,
manifest_list="s3:/a/b/c.avro",
summary=Summary(Operation.APPEND),