This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new eba4beef Initial implementation of the manifest table (#717)
eba4beef is described below

commit eba4beeff046dd92d234fe7779fdbe76d61bd1bf
Author: Drew Gallardo <[email protected]>
AuthorDate: Thu May 23 02:39:26 2024 -0700

    Initial implementation of the manifest table (#717)
---
 mkdocs/docs/api.md                      | 50 ++++++++++++++++++
 pyiceberg/table/__init__.py             | 89 +++++++++++++++++++++++++++++++++
 tests/integration/test_inspect_table.py | 83 ++++++++++++++++++++++++++++++
 3 files changed, 222 insertions(+)

diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 0bc23fb0..70b5fd62 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -606,6 +606,56 @@ min_snapshots_to_keep: [[null,10]]
 max_snapshot_age_in_ms: [[null,604800000]]
 ```
 
+### Manifests
+
+To show a table's current file manifests:
+
+```python
+table.inspect.manifests()
+```
+
+```
+pyarrow.Table
+content: int8 not null
+path: string not null
+length: int64 not null
+partition_spec_id: int32 not null
+added_snapshot_id: int64 not null
+added_data_files_count: int32 not null
+existing_data_files_count: int32 not null
+deleted_data_files_count: int32 not null
+added_delete_files_count: int32 not null
+existing_delete_files_count: int32 not null
+deleted_delete_files_count: int32 not null
+partition_summaries: list<item: struct<contains_null: bool not null, 
contains_nan: bool, lower_bound: string, upper_bound: string>> not null
+  child 0, item: struct<contains_null: bool not null, contains_nan: bool, 
lower_bound: string, upper_bound: string>
+      child 0, contains_null: bool not null
+      child 1, contains_nan: bool
+      child 2, lower_bound: string
+      child 3, upper_bound: string
+----
+content: [[0]]
+path: 
[["s3://warehouse/default/table_metadata_manifests/metadata/3bf5b4c6-a7a4-4b43-a6ce-ca2b4887945a-m0.avro"]]
+length: [[6886]]
+partition_spec_id: [[0]]
+added_snapshot_id: [[3815834705531553721]]
+added_data_files_count: [[1]]
+existing_data_files_count: [[0]]
+deleted_data_files_count: [[0]]
+added_delete_files_count: [[0]]
+existing_delete_files_count: [[0]]
+deleted_delete_files_count: [[0]]
+partition_summaries: [[    -- is_valid: all not null
+    -- child 0 type: bool
+[false]
+    -- child 1 type: bool
+[false]
+    -- child 2 type: string
+["test"]
+    -- child 3 type: string
+["test"]]]
+```
+
 ## Add Files
 
 Expert Iceberg users may choose to commit existing parquet files to the 
Iceberg table as data files, without rewriting them.
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index c57f0d12..74b0225d 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -71,6 +71,7 @@ from pyiceberg.manifest import (
     ManifestEntry,
     ManifestEntryStatus,
     ManifestFile,
+    PartitionFieldSummary,
     write_manifest,
     write_manifest_list,
 )
@@ -3547,6 +3548,94 @@ class InspectTable:
             schema=table_schema,
         )
 
+    def manifests(self) -> "pa.Table":
+        import pyarrow as pa
+
+        from pyiceberg.conversions import from_bytes
+
+        partition_summary_schema = pa.struct([
+            pa.field("contains_null", pa.bool_(), nullable=False),
+            pa.field("contains_nan", pa.bool_(), nullable=True),
+            pa.field("lower_bound", pa.string(), nullable=True),
+            pa.field("upper_bound", pa.string(), nullable=True),
+        ])
+
+        manifest_schema = pa.schema([
+            pa.field('content', pa.int8(), nullable=False),
+            pa.field('path', pa.string(), nullable=False),
+            pa.field('length', pa.int64(), nullable=False),
+            pa.field('partition_spec_id', pa.int32(), nullable=False),
+            pa.field('added_snapshot_id', pa.int64(), nullable=False),
+            pa.field('added_data_files_count', pa.int32(), nullable=False),
+            pa.field('existing_data_files_count', pa.int32(), nullable=False),
+            pa.field('deleted_data_files_count', pa.int32(), nullable=False),
+            pa.field('added_delete_files_count', pa.int32(), nullable=False),
+            pa.field('existing_delete_files_count', pa.int32(), 
nullable=False),
+            pa.field('deleted_delete_files_count', pa.int32(), nullable=False),
+            pa.field('partition_summaries', 
pa.list_(partition_summary_schema), nullable=False),
+        ])
+
+        def _partition_summaries_to_rows(
+            spec: PartitionSpec, partition_summaries: 
List[PartitionFieldSummary]
+        ) -> List[Dict[str, Any]]:
+            rows = []
+            for i, field_summary in enumerate(partition_summaries):
+                field = spec.fields[i]
+                partition_field_type = 
spec.partition_type(self.tbl.schema()).fields[i].field_type
+                lower_bound = (
+                    (
+                        field.transform.to_human_string(
+                            partition_field_type, 
from_bytes(partition_field_type, field_summary.lower_bound)
+                        )
+                    )
+                    if field_summary.lower_bound
+                    else None
+                )
+                upper_bound = (
+                    (
+                        field.transform.to_human_string(
+                            partition_field_type, 
from_bytes(partition_field_type, field_summary.upper_bound)
+                        )
+                    )
+                    if field_summary.upper_bound
+                    else None
+                )
+                rows.append({
+                    'contains_null': field_summary.contains_null,
+                    'contains_nan': field_summary.contains_nan,
+                    'lower_bound': lower_bound,
+                    'upper_bound': upper_bound,
+                })
+            return rows
+
+        specs = self.tbl.metadata.specs()
+        manifests = []
+        if snapshot := self.tbl.metadata.current_snapshot():
+            for manifest in snapshot.manifests(self.tbl.io):
+                is_data_file = manifest.content == ManifestContent.DATA
+                is_delete_file = manifest.content == ManifestContent.DELETES
+                manifests.append({
+                    'content': manifest.content,
+                    'path': manifest.manifest_path,
+                    'length': manifest.manifest_length,
+                    'partition_spec_id': manifest.partition_spec_id,
+                    'added_snapshot_id': manifest.added_snapshot_id,
+                    'added_data_files_count': manifest.added_files_count if 
is_data_file else 0,
+                    'existing_data_files_count': manifest.existing_files_count 
if is_data_file else 0,
+                    'deleted_data_files_count': manifest.deleted_files_count 
if is_data_file else 0,
+                    'added_delete_files_count': manifest.added_files_count if 
is_delete_file else 0,
+                    'existing_delete_files_count': 
manifest.existing_files_count if is_delete_file else 0,
+                    'deleted_delete_files_count': manifest.deleted_files_count 
if is_delete_file else 0,
+                    'partition_summaries': 
_partition_summaries_to_rows(specs[manifest.partition_spec_id], 
manifest.partitions)
+                    if manifest.partitions
+                    else [],
+                })
+
+        return pa.Table.from_pylist(
+            manifests,
+            schema=manifest_schema,
+        )
+
 
 @dataclass(frozen=True)
 class TablePartition:
diff --git a/tests/integration/test_inspect_table.py 
b/tests/integration/test_inspect_table.py
index a884f9d4..8665435e 100644
--- a/tests/integration/test_inspect_table.py
+++ b/tests/integration/test_inspect_table.py
@@ -445,3 +445,86 @@ def test_inspect_partitions_partitioned(spark: 
SparkSession, session_catalog: Ca
         df = tbl.inspect.partitions(snapshot_id=snapshot.snapshot_id)
         spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION 
AS OF {snapshot.snapshot_id}")
         check_pyiceberg_df_equals_spark_df(df, spark_df)
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_inspect_manifests(spark: SparkSession, session_catalog: Catalog, 
format_version: int) -> None:
+    identifier = "default.table_metadata_manifests"
+    try:
+        session_catalog.drop_table(identifier=identifier)
+    except NoSuchTableError:
+        pass
+
+    spark.sql(
+        f"""
+        CREATE TABLE {identifier} (
+            id int,
+            data string
+        )
+        PARTITIONED BY (data)
+    """
+    )
+
+    spark.sql(
+        f"""
+        INSERT INTO {identifier} VALUES (1, "a")
+    """
+    )
+
+    spark.sql(
+        f"""
+        INSERT INTO {identifier} VALUES (2, "b")
+    """
+    )
+
+    df = session_catalog.load_table(identifier).inspect.manifests()
+
+    assert df.column_names == [
+        'content',
+        'path',
+        'length',
+        'partition_spec_id',
+        'added_snapshot_id',
+        'added_data_files_count',
+        'existing_data_files_count',
+        'deleted_data_files_count',
+        'added_delete_files_count',
+        'existing_delete_files_count',
+        'deleted_delete_files_count',
+        'partition_summaries',
+    ]
+
+    int_cols = [
+        'content',
+        'length',
+        'partition_spec_id',
+        'added_snapshot_id',
+        'added_data_files_count',
+        'existing_data_files_count',
+        'deleted_data_files_count',
+        'added_delete_files_count',
+        'existing_delete_files_count',
+        'deleted_delete_files_count',
+    ]
+
+    for column in int_cols:
+        for value in df[column]:
+            assert isinstance(value.as_py(), int)
+
+    for value in df["path"]:
+        assert isinstance(value.as_py(), str)
+
+    for value in df["partition_summaries"]:
+        assert isinstance(value.as_py(), list)
+        for row in value:
+            assert isinstance(row["contains_null"].as_py(), bool)
+            assert isinstance(row["contains_nan"].as_py(), (bool, type(None)))
+            assert isinstance(row["lower_bound"].as_py(), (str, type(None)))
+            assert isinstance(row["upper_bound"].as_py(), (str, type(None)))
+
+    lhs = spark.table(f"{identifier}.manifests").toPandas()
+    rhs = df.to_pandas()
+    for column in df.column_names:
+        for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
+            assert left == right, f"Difference in column {column}: {left} != 
{right}"

Reply via email to