This is an automated email from the ASF dual-hosted git repository.
honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 49ac3a27 Add Partitions Metadata Table (#603)
49ac3a27 is described below
commit 49ac3a27794fc12cfb67b29502ba92b429396201
Author: Sung Yun <[email protected]>
AuthorDate: Tue Apr 16 02:22:24 2024 -0400
Add Partitions Metadata Table (#603)
---
mkdocs/docs/api.md | 41 +++++++++++
pyiceberg/table/__init__.py | 89 +++++++++++++++++++++++
tests/integration/test_inspect_table.py | 120 ++++++++++++++++++++++++++++++++
3 files changed, 250 insertions(+)
diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 9bdb6dcd..cf4276ed 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -382,6 +382,47 @@ manifest_list:
[["s3://warehouse/default/table_metadata_snapshots/metadata/snap-
summary:
[[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","54
[...]
```
+### Partitions
+
+Inspect the partitions of the table:
+
+```python
+table.inspect.partitions()
+```
+
+```
+pyarrow.Table
+partition: struct<dt_month: int32, dt_day: date32[day]> not null
+ child 0, dt_month: int32
+ child 1, dt_day: date32[day]
+spec_id: int32 not null
+record_count: int64 not null
+file_count: int32 not null
+total_data_file_size_in_bytes: int64 not null
+position_delete_record_count: int64 not null
+position_delete_file_count: int32 not null
+equality_delete_record_count: int64 not null
+equality_delete_file_count: int32 not null
+last_updated_at: timestamp[ms]
+last_updated_snapshot_id: int64
+----
+partition: [
+ -- is_valid: all not null
+ -- child 0 type: int32
+[null,null,612]
+ -- child 1 type: date32[day]
+[null,2021-02-01,null]]
+spec_id: [[2,1,0]]
+record_count: [[1,1,2]]
+file_count: [[1,1,2]]
+total_data_file_size_in_bytes: [[641,641,1260]]
+position_delete_record_count: [[0,0,0]]
+position_delete_file_count: [[0,0,0]]
+equality_delete_record_count: [[0,0,0]]
+equality_delete_file_count: [[0,0,0]]
+last_updated_at: [[2024-04-13 18:59:35.981,2024-04-13 18:59:35.465,2024-04-13
18:59:35.003]]
+```
+
### Entries
To show all the table's current manifest entries for both data and delete
files.
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index da4b1465..95fdb1d2 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -137,6 +137,7 @@ from pyiceberg.types import (
)
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.datetime import datetime_to_millis
+from pyiceberg.utils.singleton import _convert_to_hashable_type
if TYPE_CHECKING:
import daft
@@ -3422,6 +3423,94 @@ class InspectTable:
schema=entries_schema,
)
+ def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table":
+ import pyarrow as pa
+
+ from pyiceberg.io.pyarrow import schema_to_pyarrow
+
+ table_schema = pa.schema([
+ pa.field('record_count', pa.int64(), nullable=False),
+ pa.field('file_count', pa.int32(), nullable=False),
+ pa.field('total_data_file_size_in_bytes', pa.int64(),
nullable=False),
+ pa.field('position_delete_record_count', pa.int64(),
nullable=False),
+ pa.field('position_delete_file_count', pa.int32(), nullable=False),
+ pa.field('equality_delete_record_count', pa.int64(),
nullable=False),
+ pa.field('equality_delete_file_count', pa.int32(), nullable=False),
+ pa.field('last_updated_at', pa.timestamp(unit='ms'),
nullable=True),
+ pa.field('last_updated_snapshot_id', pa.int64(), nullable=True),
+ ])
+
+ partition_record = self.tbl.metadata.specs_struct()
+ has_partitions = len(partition_record.fields) > 0
+
+ if has_partitions:
+ pa_record_struct = schema_to_pyarrow(partition_record)
+ partitions_schema = pa.schema([
+ pa.field('partition', pa_record_struct, nullable=False),
+ pa.field('spec_id', pa.int32(), nullable=False),
+ ])
+
+ table_schema = pa.unify_schemas([partitions_schema, table_schema])
+
+ def update_partitions_map(
+ partitions_map: Dict[Tuple[str, Any], Any],
+ file: DataFile,
+ partition_record_dict: Dict[str, Any],
+ snapshot: Optional[Snapshot],
+ ) -> None:
+ partition_record_key =
_convert_to_hashable_type(partition_record_dict)
+ if partition_record_key not in partitions_map:
+ partitions_map[partition_record_key] = {
+ "partition": partition_record_dict,
+ "spec_id": file.spec_id,
+ "record_count": 0,
+ "file_count": 0,
+ "total_data_file_size_in_bytes": 0,
+ "position_delete_record_count": 0,
+ "position_delete_file_count": 0,
+ "equality_delete_record_count": 0,
+ "equality_delete_file_count": 0,
+ "last_updated_at": snapshot.timestamp_ms if snapshot else
None,
+ "last_updated_snapshot_id": snapshot.snapshot_id if
snapshot else None,
+ }
+
+ partition_row = partitions_map[partition_record_key]
+
+ if snapshot is not None:
+ if partition_row["last_updated_at"] is None or
partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms:
+ partition_row["last_updated_at"] = snapshot.timestamp_ms
+ partition_row["last_updated_snapshot_id"] =
snapshot.snapshot_id
+
+ if file.content == DataFileContent.DATA:
+ partition_row["record_count"] += file.record_count
+ partition_row["file_count"] += 1
+ partition_row["total_data_file_size_in_bytes"] +=
file.file_size_in_bytes
+ elif file.content == DataFileContent.POSITION_DELETES:
+ partition_row["position_delete_record_count"] +=
file.record_count
+ partition_row["position_delete_file_count"] += 1
+ elif file.content == DataFileContent.EQUALITY_DELETES:
+ partition_row["equality_delete_record_count"] +=
file.record_count
+ partition_row["equality_delete_file_count"] += 1
+ else:
+ raise ValueError(f"Unknown DataFileContent ({file.content})")
+
+ partitions_map: Dict[Tuple[str, Any], Any] = {}
+ snapshot = self._get_snapshot(snapshot_id)
+ for manifest in snapshot.manifests(self.tbl.io):
+ for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
+ partition = entry.data_file.partition
+ partition_record_dict = {
+ field.name: partition[pos]
+ for pos, field in
enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
+ }
+ entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if
entry.snapshot_id is not None else None
+ update_partitions_map(partitions_map, entry.data_file,
partition_record_dict, entry_snapshot)
+
+ return pa.Table.from_pylist(
+ partitions_map.values(),
+ schema=table_schema,
+ )
+
@dataclass(frozen=True)
class TablePartition:
diff --git a/tests/integration/test_inspect_table.py
b/tests/integration/test_inspect_table.py
index 7cbfc6da..d9ec5634 100644
--- a/tests/integration/test_inspect_table.py
+++ b/tests/integration/test_inspect_table.py
@@ -270,3 +270,123 @@ def test_inspect_entries_partitioned(spark: SparkSession,
session_catalog: Catal
assert df.to_pydict()['data_file'][0]['partition'] == {'dt_day':
date(2021, 2, 1), 'dt_month': None}
assert df.to_pydict()['data_file'][1]['partition'] == {'dt_day': None,
'dt_month': 612}
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_inspect_partitions_unpartitioned(
+ spark: SparkSession, session_catalog: Catalog, arrow_table_with_null:
pa.Table, format_version: int
+) -> None:
+ identifier = "default.table_metadata_partitions_unpartitioned"
+ tbl = _create_table(session_catalog, identifier,
properties={"format-version": format_version})
+
+ # Write some data through multiple commits
+ tbl.append(arrow_table_with_null)
+ tbl.append(arrow_table_with_null)
+
+ df = tbl.inspect.partitions()
+ assert df.column_names == [
+ 'record_count',
+ 'file_count',
+ 'total_data_file_size_in_bytes',
+ 'position_delete_record_count',
+ 'position_delete_file_count',
+ 'equality_delete_record_count',
+ 'equality_delete_file_count',
+ 'last_updated_at',
+ 'last_updated_snapshot_id',
+ ]
+ for last_updated_at in df['last_updated_at']:
+ assert isinstance(last_updated_at.as_py(), datetime)
+
+ int_cols = [
+ 'record_count',
+ 'file_count',
+ 'total_data_file_size_in_bytes',
+ 'position_delete_record_count',
+ 'position_delete_file_count',
+ 'equality_delete_record_count',
+ 'equality_delete_file_count',
+ 'last_updated_snapshot_id',
+ ]
+ for column in int_cols:
+ for value in df[column]:
+ assert isinstance(value.as_py(), int)
+ lhs = df.to_pandas()
+ rhs = spark.table(f"{identifier}.partitions").toPandas()
+ for column in df.column_names:
+ for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
+ assert left == right, f"Difference in column {column}: {left} !=
{right}"
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_inspect_partitions_partitioned(spark: SparkSession, session_catalog:
Catalog, format_version: int) -> None:
+ identifier = "default.table_metadata_partitions_partitioned"
+ try:
+ session_catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ spark.sql(
+ f"""
+ CREATE TABLE {identifier} (
+ name string,
+ dt date
+ )
+ PARTITIONED BY (months(dt))
+ """
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO {identifier} VALUES ('John', CAST('2021-01-01' AS date))
+ """
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO {identifier} VALUES ('Doe', CAST('2021-01-05' AS date))
+ """
+ )
+
+ spark.sql(
+ f"""
+ ALTER TABLE {identifier}
+ REPLACE PARTITION FIELD dt_month WITH days(dt)
+ """
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO {identifier} VALUES ('Jenny', CAST('2021-02-01' AS date))
+ """
+ )
+
+ spark.sql(
+ f"""
+ ALTER TABLE {identifier}
+ DROP PARTITION FIELD dt_day
+ """
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO {identifier} VALUES ('James', CAST('2021-02-01' AS date))
+ """
+ )
+
+ def check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame)
-> None:
+ lhs = df.to_pandas().sort_values('spec_id')
+ rhs = spark_df.toPandas().sort_values('spec_id')
+ for column in df.column_names:
+ for left, right in zip(lhs[column].to_list(),
rhs[column].to_list()):
+ if column == "partition":
+ right = right.asDict()
+ assert left == right, f"Difference in column {column}: {left}
!= {right}"
+
+ tbl = session_catalog.load_table(identifier)
+ for snapshot in tbl.metadata.snapshots:
+ df = tbl.inspect.partitions(snapshot_id=snapshot.snapshot_id)
+ spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION
AS OF {snapshot.snapshot_id}")
+ check_pyiceberg_df_equals_spark_df(df, spark_df)