This is an automated email from the ASF dual-hosted git repository.

honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 49ac3a27 Add Partitions Metadata Table (#603)
49ac3a27 is described below

commit 49ac3a27794fc12cfb67b29502ba92b429396201
Author: Sung Yun <107272191+syu...@users.noreply.github.com>
AuthorDate: Tue Apr 16 02:22:24 2024 -0400

    Add Partitions Metadata Table (#603)
---
 mkdocs/docs/api.md                      |  41 +++++++++++
 pyiceberg/table/__init__.py             |  89 +++++++++++++++++++++++
 tests/integration/test_inspect_table.py | 120 ++++++++++++++++++++++++++++++++
 3 files changed, 250 insertions(+)

diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 9bdb6dcd..cf4276ed 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -382,6 +382,47 @@ manifest_list: 
[["s3://warehouse/default/table_metadata_snapshots/metadata/snap-
 summary: 
[[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","54
 [...]
 ```
 
+### Partitions
+
+Inspect the partitions of the table:
+
+```python
+table.inspect.partitions()
+```
+
+```
+pyarrow.Table
+partition: struct<dt_month: int32, dt_day: date32[day]> not null
+  child 0, dt_month: int32
+  child 1, dt_day: date32[day]
+spec_id: int32 not null
+record_count: int64 not null
+file_count: int32 not null
+total_data_file_size_in_bytes: int64 not null
+position_delete_record_count: int64 not null
+position_delete_file_count: int32 not null
+equality_delete_record_count: int64 not null
+equality_delete_file_count: int32 not null
+last_updated_at: timestamp[ms]
+last_updated_snapshot_id: int64
+----
+partition: [
+  -- is_valid: all not null
+  -- child 0 type: int32
+[null,null,612]
+  -- child 1 type: date32[day]
+[null,2021-02-01,null]]
+spec_id: [[2,1,0]]
+record_count: [[1,1,2]]
+file_count: [[1,1,2]]
+total_data_file_size_in_bytes: [[641,641,1260]]
+position_delete_record_count: [[0,0,0]]
+position_delete_file_count: [[0,0,0]]
+equality_delete_record_count: [[0,0,0]]
+equality_delete_file_count: [[0,0,0]]
+last_updated_at: [[2024-04-13 18:59:35.981,2024-04-13 18:59:35.465,2024-04-13 
18:59:35.003]]
+```
+
 ### Entries
 
 To show all the table's current manifest entries for both data and delete 
files.
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index da4b1465..95fdb1d2 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -137,6 +137,7 @@ from pyiceberg.types import (
 )
 from pyiceberg.utils.concurrent import ExecutorFactory
 from pyiceberg.utils.datetime import datetime_to_millis
+from pyiceberg.utils.singleton import _convert_to_hashable_type
 
 if TYPE_CHECKING:
     import daft
@@ -3422,6 +3423,94 @@ class InspectTable:
             schema=entries_schema,
         )
 
+    def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table":
+        import pyarrow as pa
+
+        from pyiceberg.io.pyarrow import schema_to_pyarrow
+
+        table_schema = pa.schema([
+            pa.field('record_count', pa.int64(), nullable=False),
+            pa.field('file_count', pa.int32(), nullable=False),
+            pa.field('total_data_file_size_in_bytes', pa.int64(), 
nullable=False),
+            pa.field('position_delete_record_count', pa.int64(), 
nullable=False),
+            pa.field('position_delete_file_count', pa.int32(), nullable=False),
+            pa.field('equality_delete_record_count', pa.int64(), 
nullable=False),
+            pa.field('equality_delete_file_count', pa.int32(), nullable=False),
+            pa.field('last_updated_at', pa.timestamp(unit='ms'), 
nullable=True),
+            pa.field('last_updated_snapshot_id', pa.int64(), nullable=True),
+        ])
+
+        partition_record = self.tbl.metadata.specs_struct()
+        has_partitions = len(partition_record.fields) > 0
+
+        if has_partitions:
+            pa_record_struct = schema_to_pyarrow(partition_record)
+            partitions_schema = pa.schema([
+                pa.field('partition', pa_record_struct, nullable=False),
+                pa.field('spec_id', pa.int32(), nullable=False),
+            ])
+
+            table_schema = pa.unify_schemas([partitions_schema, table_schema])
+
+        def update_partitions_map(
+            partitions_map: Dict[Tuple[str, Any], Any],
+            file: DataFile,
+            partition_record_dict: Dict[str, Any],
+            snapshot: Optional[Snapshot],
+        ) -> None:
+            partition_record_key = 
_convert_to_hashable_type(partition_record_dict)
+            if partition_record_key not in partitions_map:
+                partitions_map[partition_record_key] = {
+                    "partition": partition_record_dict,
+                    "spec_id": file.spec_id,
+                    "record_count": 0,
+                    "file_count": 0,
+                    "total_data_file_size_in_bytes": 0,
+                    "position_delete_record_count": 0,
+                    "position_delete_file_count": 0,
+                    "equality_delete_record_count": 0,
+                    "equality_delete_file_count": 0,
+                    "last_updated_at": snapshot.timestamp_ms if snapshot else 
None,
+                    "last_updated_snapshot_id": snapshot.snapshot_id if 
snapshot else None,
+                }
+
+            partition_row = partitions_map[partition_record_key]
+
+            if snapshot is not None:
+                if partition_row["last_updated_at"] is None or 
partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms:
+                    partition_row["last_updated_at"] = snapshot.timestamp_ms
+                    partition_row["last_updated_snapshot_id"] = 
snapshot.snapshot_id
+
+            if file.content == DataFileContent.DATA:
+                partition_row["record_count"] += file.record_count
+                partition_row["file_count"] += 1
+                partition_row["total_data_file_size_in_bytes"] += 
file.file_size_in_bytes
+            elif file.content == DataFileContent.POSITION_DELETES:
+                partition_row["position_delete_record_count"] += 
file.record_count
+                partition_row["position_delete_file_count"] += 1
+            elif file.content == DataFileContent.EQUALITY_DELETES:
+                partition_row["equality_delete_record_count"] += 
file.record_count
+                partition_row["equality_delete_file_count"] += 1
+            else:
+                raise ValueError(f"Unknown DataFileContent ({file.content})")
+
+        partitions_map: Dict[Tuple[str, Any], Any] = {}
+        snapshot = self._get_snapshot(snapshot_id)
+        for manifest in snapshot.manifests(self.tbl.io):
+            for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
+                partition = entry.data_file.partition
+                partition_record_dict = {
+                    field.name: partition[pos]
+                    for pos, field in 
enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
+                }
+                entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if 
entry.snapshot_id is not None else None
+                update_partitions_map(partitions_map, entry.data_file, 
partition_record_dict, entry_snapshot)
+
+        return pa.Table.from_pylist(
+            partitions_map.values(),
+            schema=table_schema,
+        )
+
 
 @dataclass(frozen=True)
 class TablePartition:
diff --git a/tests/integration/test_inspect_table.py 
b/tests/integration/test_inspect_table.py
index 7cbfc6da..d9ec5634 100644
--- a/tests/integration/test_inspect_table.py
+++ b/tests/integration/test_inspect_table.py
@@ -270,3 +270,123 @@ def test_inspect_entries_partitioned(spark: SparkSession, 
session_catalog: Catal
 
     assert df.to_pydict()['data_file'][0]['partition'] == {'dt_day': 
date(2021, 2, 1), 'dt_month': None}
     assert df.to_pydict()['data_file'][1]['partition'] == {'dt_day': None, 
'dt_month': 612}
+
+
+@pytest.mark.integration
+@pytest.mark.parametrize("format_version", [1, 2])
+def test_inspect_partitions_unpartitioned(
+    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: 
pa.Table, format_version: int
+) -> None:
+    identifier = "default.table_metadata_partitions_unpartitioned"
+    tbl = _create_table(session_catalog, identifier, 
properties={"format-version": format_version})
+
+    # Write some data through multiple commits
+    tbl.append(arrow_table_with_null)
+    tbl.append(arrow_table_with_null)
+
+    df = tbl.inspect.partitions()
+    assert df.column_names == [
+        'record_count',
+        'file_count',
+        'total_data_file_size_in_bytes',
+        'position_delete_record_count',
+        'position_delete_file_count',
+        'equality_delete_record_count',
+        'equality_delete_file_count',
+        'last_updated_at',
+        'last_updated_snapshot_id',
+    ]
+    for last_updated_at in df['last_updated_at']:
+        assert isinstance(last_updated_at.as_py(), datetime)
+
+    int_cols = [
+        'record_count',
+        'file_count',
+        'total_data_file_size_in_bytes',
+        'position_delete_record_count',
+        'position_delete_file_count',
+        'equality_delete_record_count',
+        'equality_delete_file_count',
+        'last_updated_snapshot_id',
+    ]
+    for column in int_cols:
+        for value in df[column]:
+            assert isinstance(value.as_py(), int)
+    lhs = df.to_pandas()
+    rhs = spark.table(f"{identifier}.partitions").toPandas()
+    for column in df.column_names:
+        for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
+            assert left == right, f"Difference in column {column}: {left} != 
{right}"
+
+
+@pytest.mark.integration
+@pytest.mark.parametrize("format_version", [1, 2])
+def test_inspect_partitions_partitioned(spark: SparkSession, session_catalog: 
Catalog, format_version: int) -> None:
+    identifier = "default.table_metadata_partitions_partitioned"
+    try:
+        session_catalog.drop_table(identifier=identifier)
+    except NoSuchTableError:
+        pass
+
+    spark.sql(
+        f"""
+        CREATE TABLE {identifier} (
+            name string,
+            dt date
+        )
+        PARTITIONED BY (months(dt))
+    """
+    )
+
+    spark.sql(
+        f"""
+        INSERT INTO {identifier} VALUES ('John', CAST('2021-01-01' AS date))
+    """
+    )
+
+    spark.sql(
+        f"""
+        INSERT INTO {identifier} VALUES ('Doe', CAST('2021-01-05' AS date))
+    """
+    )
+
+    spark.sql(
+        f"""
+        ALTER TABLE {identifier}
+        REPLACE PARTITION FIELD dt_month WITH days(dt)
+    """
+    )
+
+    spark.sql(
+        f"""
+        INSERT INTO {identifier} VALUES ('Jenny', CAST('2021-02-01' AS date))
+    """
+    )
+
+    spark.sql(
+        f"""
+        ALTER TABLE {identifier}
+        DROP PARTITION FIELD dt_day
+    """
+    )
+
+    spark.sql(
+        f"""
+        INSERT INTO {identifier} VALUES ('James', CAST('2021-02-01' AS date))
+    """
+    )
+
+    def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) 
-> None:
+        lhs = df.to_pandas().sort_values('spec_id')
+        rhs = spark_df.toPandas().sort_values('spec_id')
+        for column in df.column_names:
+            for left, right in zip(lhs[column].to_list(), 
rhs[column].to_list()):
+                if column == "partition":
+                    right = right.asDict()
+                assert left == right, f"Difference in column {column}: {left} 
!= {right}"
+
+    tbl = session_catalog.load_table(identifier)
+    for snapshot in tbl.metadata.snapshots:
+        df = tbl.inspect.partitions(snapshot_id=snapshot.snapshot_id)
+        spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION 
AS OF {snapshot.snapshot_id}")
+        check_pyiceberg_df_equals_spark_df(df, spark_df)

Reply via email to