This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ee2d193 Add Refs metadata table (#602)
2ee2d193 is described below

commit 2ee2d1937e2782c4b48f4b6d9f496e49858e82fc
Author: Drew Gallardo <[email protected]>
AuthorDate: Wed Apr 17 04:18:40 2024 -0700

    Add Refs metadata table (#602)
    
    * Add Refs metadata table
    
    * address the linting issues
    
    * add more tests to the output
    
    * Change string type to categorical
    
    * fix linting after rebasing.
    
    * Update tests/integration/test_inspect_table.py
    
    ---------
    
    Co-authored-by: Fokko Driesprong <[email protected]>
---
 mkdocs/docs/api.md                      | 25 ++++++++++++++
 pyiceberg/table/__init__.py             | 26 ++++++++++++++
 tests/integration/test_inspect_table.py | 60 +++++++++++++++++++++++++++++++++
 3 files changed, 111 insertions(+)

diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index cf4276ed..0bc23fb0 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -581,6 +581,31 @@ readable_metrics: [
 [6.0989]]
 ```
 
+### References
+
+To show a table's known snapshot references:
+
+```python
+table.inspect.refs()
+```
+
+```
+pyarrow.Table
+name: string not null
+type: string not null
+snapshot_id: int64 not null
+max_reference_age_in_ms: int64
+min_snapshots_to_keep: int32
+max_snapshot_age_in_ms: int64
+----
+name: [["main","testTag"]]
+type: [["BRANCH","TAG"]]
+snapshot_id: [[2278002651076891950,2278002651076891950]]
+max_reference_age_in_ms: [[null,604800000]]
+min_snapshots_to_keep: [[null,10]]
+max_snapshot_age_in_ms: [[null,604800000]]
+```
+
 ## Add Files
 
 Expert Iceberg users may choose to commit existing parquet files to the 
Iceberg table as data files, without rewriting them.
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 95fdb1d2..13186c42 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -3423,6 +3423,32 @@ class InspectTable:
             schema=entries_schema,
         )
 
+    def refs(self) -> "pa.Table":
+        import pyarrow as pa
+
+        ref_schema = pa.schema([
+            pa.field('name', pa.string(), nullable=False),
+            pa.field('type', pa.dictionary(pa.int32(), pa.string()), 
nullable=False),
+            pa.field('snapshot_id', pa.int64(), nullable=False),
+            pa.field('max_reference_age_in_ms', pa.int64(), nullable=True),
+            pa.field('min_snapshots_to_keep', pa.int32(), nullable=True),
+            pa.field('max_snapshot_age_in_ms', pa.int64(), nullable=True),
+        ])
+
+        ref_results = []
+        for ref in self.tbl.metadata.refs:
+            if snapshot_ref := self.tbl.metadata.refs.get(ref):
+                ref_results.append({
+                    'name': ref,
+                    'type': snapshot_ref.snapshot_ref_type.upper(),
+                    'snapshot_id': snapshot_ref.snapshot_id,
+                    'max_reference_age_in_ms': snapshot_ref.max_ref_age_ms,
+                    'min_snapshots_to_keep': 
snapshot_ref.min_snapshots_to_keep,
+                    'max_snapshot_age_in_ms': snapshot_ref.max_snapshot_age_ms,
+                })
+
+        return pa.Table.from_pylist(ref_results, schema=ref_schema)
+
     def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table":
         import pyarrow as pa
 
diff --git a/tests/integration/test_inspect_table.py 
b/tests/integration/test_inspect_table.py
index d9ec5634..0905eda8 100644
--- a/tests/integration/test_inspect_table.py
+++ b/tests/integration/test_inspect_table.py
@@ -272,6 +272,66 @@ def test_inspect_entries_partitioned(spark: SparkSession, 
session_catalog: Catal
     assert df.to_pydict()['data_file'][1]['partition'] == {'dt_day': None, 
'dt_month': 612}
 
 
[email protected]
[email protected]("format_version", [1, 2])
+def test_inspect_refs(
+    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: 
pa.Table, format_version: int
+) -> None:
+    identifier = "default.table_metadata_refs"
+    tbl = _create_table(session_catalog, identifier, 
properties={"format-version": format_version})
+
+    # write data to create snapshot
+    tbl.overwrite(arrow_table_with_null)
+
+    # create a test branch
+    spark.sql(
+        f"""
+    ALTER TABLE {identifier} CREATE BRANCH IF NOT EXISTS testBranch RETAIN 7 
DAYS WITH SNAPSHOT RETENTION 2 SNAPSHOTS
+        """
+    )
+
+    # create a test tag against current snapshot
+    current_snapshot = tbl.current_snapshot()
+    assert current_snapshot is not None
+    current_snapshot_id = current_snapshot.snapshot_id
+
+    spark.sql(
+        f"""
+    ALTER TABLE {identifier} CREATE TAG testTag AS OF VERSION 
{current_snapshot_id} RETAIN 180 DAYS
+        """
+    )
+
+    df = tbl.refresh().inspect.refs()
+
+    assert df.column_names == [
+        'name',
+        'type',
+        'snapshot_id',
+        'max_reference_age_in_ms',
+        'min_snapshots_to_keep',
+        'max_snapshot_age_in_ms',
+    ]
+
+    assert [name.as_py() for name in df['name']] == ['testBranch', 'main', 
'testTag']
+    assert [ref_type.as_py() for ref_type in df['type']] == ['BRANCH', 
'BRANCH', 'TAG']
+
+    for snapshot_id in df['snapshot_id']:
+        assert isinstance(snapshot_id.as_py(), int)
+
+    for int_column in ['max_reference_age_in_ms', 'min_snapshots_to_keep', 
'max_snapshot_age_in_ms']:
+        for value in df[int_column]:
+            assert isinstance(value.as_py(), int) or not value.as_py()
+
+    lhs = spark.table(f"{identifier}.refs").toPandas()
+    rhs = df.to_pandas()
+    for column in df.column_names:
+        for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
+            if isinstance(left, float) and math.isnan(left) and 
isinstance(right, float) and math.isnan(right):
+                # NaN != NaN in Python
+                continue
+            assert left == right, f"Difference in column {column}: {left} != 
{right}"
+
+
 @pytest.mark.integration
 @pytest.mark.parametrize("format_version", [1, 2])
 def test_inspect_partitions_unpartitioned(

Reply via email to