This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new cad0ad7d Add `all_manifests` metadata table with tests (#1241)
cad0ad7d is described below

commit cad0ad7d9358315abe1315de2a64227d91acceaa
Author: Soumya Ghosh <[email protected]>
AuthorDate: Sat Jan 11 06:41:46 2025 +0530

    Add `all_manifests` metadata table with tests (#1241)
    
    * Add `all_manifests` metadata table with tests
    
    * Move get_manifests_schema and get_all_manifests_schema to InspectTable 
class
    
    * Update tests for all_manifests table
    
    * Added linter changes in inspect.py
---
 pyiceberg/table/inspect.py              | 75 ++++++++++++++++++---------
 tests/integration/test_inspect_table.py | 92 +++++++++++++++++++++++++++++++++
 2 files changed, 143 insertions(+), 24 deletions(-)

diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py
index 71d38a22..6dfa78a7 100644
--- a/pyiceberg/table/inspect.py
+++ b/pyiceberg/table/inspect.py
@@ -17,13 +17,14 @@
 from __future__ import annotations
 
 from datetime import datetime, timezone
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, 
Tuple
 
 from pyiceberg.conversions import from_bytes
 from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, 
PartitionFieldSummary
 from pyiceberg.partitioning import PartitionSpec
 from pyiceberg.table.snapshots import Snapshot, ancestors_of
 from pyiceberg.types import PrimitiveType
+from pyiceberg.utils.concurrent import ExecutorFactory
 from pyiceberg.utils.singleton import _convert_to_hashable_type
 
 if TYPE_CHECKING:
@@ -346,7 +347,7 @@ class InspectTable:
             schema=table_schema,
         )
 
-    def manifests(self) -> "pa.Table":
+    def _get_manifests_schema(self) -> "pa.Schema":
         import pyarrow as pa
 
         partition_summary_schema = pa.struct(
@@ -374,6 +375,17 @@ class InspectTable:
                 pa.field("partition_summaries", 
pa.list_(partition_summary_schema), nullable=False),
             ]
         )
+        return manifest_schema
+
+    def _get_all_manifests_schema(self) -> "pa.Schema":
+        import pyarrow as pa
+
+        all_manifests_schema = self._get_manifests_schema()
+        all_manifests_schema = 
all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), 
nullable=False))
+        return all_manifests_schema
+
+    def _generate_manifests_table(self, snapshot: Optional[Snapshot], 
is_all_manifests_table: bool = False) -> "pa.Table":
+        import pyarrow as pa
 
         def _partition_summaries_to_rows(
             spec: PartitionSpec, partition_summaries: 
List[PartitionFieldSummary]
@@ -412,36 +424,38 @@ class InspectTable:
 
         specs = self.tbl.metadata.specs()
         manifests = []
-        if snapshot := self.tbl.metadata.current_snapshot():
+        if snapshot:
             for manifest in snapshot.manifests(self.tbl.io):
                 is_data_file = manifest.content == ManifestContent.DATA
                 is_delete_file = manifest.content == ManifestContent.DELETES
-                manifests.append(
-                    {
-                        "content": manifest.content,
-                        "path": manifest.manifest_path,
-                        "length": manifest.manifest_length,
-                        "partition_spec_id": manifest.partition_spec_id,
-                        "added_snapshot_id": manifest.added_snapshot_id,
-                        "added_data_files_count": manifest.added_files_count 
if is_data_file else 0,
-                        "existing_data_files_count": 
manifest.existing_files_count if is_data_file else 0,
-                        "deleted_data_files_count": 
manifest.deleted_files_count if is_data_file else 0,
-                        "added_delete_files_count": manifest.added_files_count 
if is_delete_file else 0,
-                        "existing_delete_files_count": 
manifest.existing_files_count if is_delete_file else 0,
-                        "deleted_delete_files_count": 
manifest.deleted_files_count if is_delete_file else 0,
-                        "partition_summaries": _partition_summaries_to_rows(
-                            specs[manifest.partition_spec_id], 
manifest.partitions
-                        )
-                        if manifest.partitions
-                        else [],
-                    }
-                )
+                manifest_row = {
+                    "content": manifest.content,
+                    "path": manifest.manifest_path,
+                    "length": manifest.manifest_length,
+                    "partition_spec_id": manifest.partition_spec_id,
+                    "added_snapshot_id": manifest.added_snapshot_id,
+                    "added_data_files_count": manifest.added_files_count if 
is_data_file else 0,
+                    "existing_data_files_count": manifest.existing_files_count 
if is_data_file else 0,
+                    "deleted_data_files_count": manifest.deleted_files_count 
if is_data_file else 0,
+                    "added_delete_files_count": manifest.added_files_count if 
is_delete_file else 0,
+                    "existing_delete_files_count": 
manifest.existing_files_count if is_delete_file else 0,
+                    "deleted_delete_files_count": manifest.deleted_files_count 
if is_delete_file else 0,
+                    "partition_summaries": 
_partition_summaries_to_rows(specs[manifest.partition_spec_id], 
manifest.partitions)
+                    if manifest.partitions
+                    else [],
+                }
+                if is_all_manifests_table:
+                    manifest_row["reference_snapshot_id"] = 
snapshot.snapshot_id
+                manifests.append(manifest_row)
 
         return pa.Table.from_pylist(
             manifests,
-            schema=manifest_schema,
+            schema=self._get_all_manifests_schema() if is_all_manifests_table 
else self._get_manifests_schema(),
         )
 
+    def manifests(self) -> "pa.Table":
+        return self._generate_manifests_table(self.tbl.current_snapshot())
+
     def metadata_log_entries(self) -> "pa.Table":
         import pyarrow as pa
 
@@ -630,3 +644,16 @@ class InspectTable:
 
     def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
         return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, 
DataFileContent.EQUALITY_DELETES})
+
+    def all_manifests(self) -> "pa.Table":
+        import pyarrow as pa
+
+        snapshots = self.tbl.snapshots()
+        if not snapshots:
+            return pa.Table.from_pylist([], 
schema=self._get_all_manifests_schema())
+
+        executor = ExecutorFactory.get_or_create()
+        manifests_by_snapshots: Iterator["pa.Table"] = executor.map(
+            lambda args: self._generate_manifests_table(*args), [(snapshot, 
True) for snapshot in snapshots]
+        )
+        return pa.concat_tables(manifests_by_snapshots)
diff --git a/tests/integration/test_inspect_table.py 
b/tests/integration/test_inspect_table.py
index 68b10f32..75fe92a6 100644
--- a/tests/integration/test_inspect_table.py
+++ b/tests/integration/test_inspect_table.py
@@ -846,3 +846,95 @@ def test_inspect_files_no_snapshot(spark: SparkSession, 
session_catalog: Catalog
     inspect_files_asserts(files_df)
     inspect_files_asserts(data_files_df)
     inspect_files_asserts(delete_files_df)
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_inspect_all_manifests(spark: SparkSession, session_catalog: Catalog, 
format_version: int) -> None:
+    from pandas.testing import assert_frame_equal
+
+    identifier = "default.table_metadata_all_manifests"
+    try:
+        session_catalog.drop_table(identifier=identifier)
+    except NoSuchTableError:
+        pass
+
+    spark.sql(
+        f"""
+        CREATE TABLE {identifier} (
+            id int,
+            data string
+        )
+        PARTITIONED BY (data)
+        TBLPROPERTIES ('write.update.mode'='merge-on-read',
+                       'write.delete.mode'='merge-on-read')
+    """
+    )
+    tbl = session_catalog.load_table(identifier)
+
+    # check all_manifests when there are no snapshots
+    lhs = tbl.inspect.all_manifests().to_pandas()
+    rhs = spark.table(f"{identifier}.all_manifests").toPandas()
+    assert_frame_equal(lhs, rhs, check_dtype=False)
+
+    spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')")
+
+    spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')")
+
+    spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1")
+
+    spark.sql(f"DELETE FROM {identifier} WHERE id = 2")
+
+    spark.sql(f"INSERT OVERWRITE {identifier} VALUES (1, 'a')")
+
+    tbl.refresh()
+    df = tbl.inspect.all_manifests()
+
+    assert df.column_names == [
+        "content",
+        "path",
+        "length",
+        "partition_spec_id",
+        "added_snapshot_id",
+        "added_data_files_count",
+        "existing_data_files_count",
+        "deleted_data_files_count",
+        "added_delete_files_count",
+        "existing_delete_files_count",
+        "deleted_delete_files_count",
+        "partition_summaries",
+        "reference_snapshot_id",
+    ]
+
+    int_cols = [
+        "content",
+        "length",
+        "partition_spec_id",
+        "added_snapshot_id",
+        "added_data_files_count",
+        "existing_data_files_count",
+        "deleted_data_files_count",
+        "added_delete_files_count",
+        "existing_delete_files_count",
+        "deleted_delete_files_count",
+        "reference_snapshot_id",
+    ]
+
+    for column in int_cols:
+        for value in df[column]:
+            assert isinstance(value.as_py(), int)
+
+    for value in df["path"]:
+        assert isinstance(value.as_py(), str)
+
+    for value in df["partition_summaries"]:
+        assert isinstance(value.as_py(), list)
+        for row in value:
+            assert isinstance(row["contains_null"].as_py(), bool)
+            assert isinstance(row["contains_nan"].as_py(), (bool, type(None)))
+            assert isinstance(row["lower_bound"].as_py(), (str, type(None)))
+            assert isinstance(row["upper_bound"].as_py(), (str, type(None)))
+
+    lhs = spark.table(f"{identifier}.all_manifests").toPandas()
+    rhs = df.to_pandas()
+    assert_frame_equal(lhs, rhs, check_dtype=False)

Reply via email to