This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 2ee2d193 Add Refs metadata table (#602)
2ee2d193 is described below
commit 2ee2d1937e2782c4b48f4b6d9f496e49858e82fc
Author: Drew Gallardo <[email protected]>
AuthorDate: Wed Apr 17 04:18:40 2024 -0700
Add Refs metadata table (#602)
* Add Refs metadata table
* address the linting issues
* add more tests to the output
* Change string type to categorical
* fix linting after rebasing.
* Update tests/integration/test_inspect_table.py
---------
Co-authored-by: Fokko Driesprong <[email protected]>
---
mkdocs/docs/api.md | 25 ++++++++++++++
pyiceberg/table/__init__.py | 26 ++++++++++++++
tests/integration/test_inspect_table.py | 60 +++++++++++++++++++++++++++++++++
3 files changed, 111 insertions(+)
diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index cf4276ed..0bc23fb0 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -581,6 +581,31 @@ readable_metrics: [
[6.0989]]
```
+### References
+
+To show a table's known snapshot references:
+
+```python
+table.inspect.refs()
+```
+
+```
+pyarrow.Table
+name: string not null
+type: string not null
+snapshot_id: int64 not null
+max_reference_age_in_ms: int64
+min_snapshots_to_keep: int32
+max_snapshot_age_in_ms: int64
+----
+name: [["main","testTag"]]
+type: [["BRANCH","TAG"]]
+snapshot_id: [[2278002651076891950,2278002651076891950]]
+max_reference_age_in_ms: [[null,604800000]]
+min_snapshots_to_keep: [[null,10]]
+max_snapshot_age_in_ms: [[null,604800000]]
+```
+
## Add Files
Expert Iceberg users may choose to commit existing parquet files to the
Iceberg table as data files, without rewriting them.
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 95fdb1d2..13186c42 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -3423,6 +3423,32 @@ class InspectTable:
schema=entries_schema,
)
+ def refs(self) -> "pa.Table":
+ import pyarrow as pa
+
+ ref_schema = pa.schema([
+ pa.field('name', pa.string(), nullable=False),
+ pa.field('type', pa.dictionary(pa.int32(), pa.string()),
nullable=False),
+ pa.field('snapshot_id', pa.int64(), nullable=False),
+ pa.field('max_reference_age_in_ms', pa.int64(), nullable=True),
+ pa.field('min_snapshots_to_keep', pa.int32(), nullable=True),
+ pa.field('max_snapshot_age_in_ms', pa.int64(), nullable=True),
+ ])
+
+ ref_results = []
+ for ref in self.tbl.metadata.refs:
+ if snapshot_ref := self.tbl.metadata.refs.get(ref):
+ ref_results.append({
+ 'name': ref,
+ 'type': snapshot_ref.snapshot_ref_type.upper(),
+ 'snapshot_id': snapshot_ref.snapshot_id,
+ 'max_reference_age_in_ms': snapshot_ref.max_ref_age_ms,
+ 'min_snapshots_to_keep':
snapshot_ref.min_snapshots_to_keep,
+ 'max_snapshot_age_in_ms': snapshot_ref.max_snapshot_age_ms,
+ })
+
+ return pa.Table.from_pylist(ref_results, schema=ref_schema)
+
def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table":
import pyarrow as pa
diff --git a/tests/integration/test_inspect_table.py
b/tests/integration/test_inspect_table.py
index d9ec5634..0905eda8 100644
--- a/tests/integration/test_inspect_table.py
+++ b/tests/integration/test_inspect_table.py
@@ -272,6 +272,66 @@ def test_inspect_entries_partitioned(spark: SparkSession,
session_catalog: Catal
assert df.to_pydict()['data_file'][1]['partition'] == {'dt_day': None,
'dt_month': 612}
[email protected]
[email protected]("format_version", [1, 2])
+def test_inspect_refs(
+ spark: SparkSession, session_catalog: Catalog, arrow_table_with_null:
pa.Table, format_version: int
+) -> None:
+ identifier = "default.table_metadata_refs"
+ tbl = _create_table(session_catalog, identifier,
properties={"format-version": format_version})
+
+ # write data to create snapshot
+ tbl.overwrite(arrow_table_with_null)
+
+ # create a test branch
+ spark.sql(
+ f"""
+ ALTER TABLE {identifier} CREATE BRANCH IF NOT EXISTS testBranch RETAIN 7
DAYS WITH SNAPSHOT RETENTION 2 SNAPSHOTS
+ """
+ )
+
+ # create a test tag against current snapshot
+ current_snapshot = tbl.current_snapshot()
+ assert current_snapshot is not None
+ current_snapshot_id = current_snapshot.snapshot_id
+
+ spark.sql(
+ f"""
+ ALTER TABLE {identifier} CREATE TAG testTag AS OF VERSION
{current_snapshot_id} RETAIN 180 DAYS
+ """
+ )
+
+ df = tbl.refresh().inspect.refs()
+
+ assert df.column_names == [
+ 'name',
+ 'type',
+ 'snapshot_id',
+ 'max_reference_age_in_ms',
+ 'min_snapshots_to_keep',
+ 'max_snapshot_age_in_ms',
+ ]
+
+ assert [name.as_py() for name in df['name']] == ['testBranch', 'main',
'testTag']
+ assert [ref_type.as_py() for ref_type in df['type']] == ['BRANCH',
'BRANCH', 'TAG']
+
+ for snapshot_id in df['snapshot_id']:
+ assert isinstance(snapshot_id.as_py(), int)
+
+ for int_column in ['max_reference_age_in_ms', 'min_snapshots_to_keep',
'max_snapshot_age_in_ms']:
+ for value in df[int_column]:
+ assert isinstance(value.as_py(), int) or not value.as_py()
+
+ lhs = spark.table(f"{identifier}.refs").toPandas()
+ rhs = df.to_pandas()
+ for column in df.column_names:
+ for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
+ if isinstance(left, float) and math.isnan(left) and
isinstance(right, float) and math.isnan(right):
+ # NaN != NaN in Python
+ continue
+ assert left == right, f"Difference in column {column}: {left} !=
{right}"
+
+
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_inspect_partitions_unpartitioned(