This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new eba4beef Initial implementation of the manifest table (#717)
eba4beef is described below
commit eba4beeff046dd92d234fe7779fdbe76d61bd1bf
Author: Drew Gallardo <[email protected]>
AuthorDate: Thu May 23 02:39:26 2024 -0700
Initial implementation of the manifest table (#717)
---
mkdocs/docs/api.md | 50 ++++++++++++++++++
pyiceberg/table/__init__.py | 89 +++++++++++++++++++++++++++++++++
tests/integration/test_inspect_table.py | 83 ++++++++++++++++++++++++++++++
3 files changed, 222 insertions(+)
diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 0bc23fb0..70b5fd62 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -606,6 +606,56 @@ min_snapshots_to_keep: [[null,10]]
max_snapshot_age_in_ms: [[null,604800000]]
```
+### Manifests
+
+To show a table's current file manifests:
+
+```python
+table.inspect.manifests()
+```
+
+```
+pyarrow.Table
+content: int8 not null
+path: string not null
+length: int64 not null
+partition_spec_id: int32 not null
+added_snapshot_id: int64 not null
+added_data_files_count: int32 not null
+existing_data_files_count: int32 not null
+deleted_data_files_count: int32 not null
+added_delete_files_count: int32 not null
+existing_delete_files_count: int32 not null
+deleted_delete_files_count: int32 not null
+partition_summaries: list<item: struct<contains_null: bool not null,
contains_nan: bool, lower_bound: string, upper_bound: string>> not null
+ child 0, item: struct<contains_null: bool not null, contains_nan: bool,
lower_bound: string, upper_bound: string>
+ child 0, contains_null: bool not null
+ child 1, contains_nan: bool
+ child 2, lower_bound: string
+ child 3, upper_bound: string
+----
+content: [[0]]
+path:
[["s3://warehouse/default/table_metadata_manifests/metadata/3bf5b4c6-a7a4-4b43-a6ce-ca2b4887945a-m0.avro"]]
+length: [[6886]]
+partition_spec_id: [[0]]
+added_snapshot_id: [[3815834705531553721]]
+added_data_files_count: [[1]]
+existing_data_files_count: [[0]]
+deleted_data_files_count: [[0]]
+added_delete_files_count: [[0]]
+existing_delete_files_count: [[0]]
+deleted_delete_files_count: [[0]]
+partition_summaries: [[ -- is_valid: all not null
+ -- child 0 type: bool
+[false]
+ -- child 1 type: bool
+[false]
+ -- child 2 type: string
+["test"]
+ -- child 3 type: string
+["test"]]]
+```
+
## Add Files
Expert Iceberg users may choose to commit existing parquet files to the
Iceberg table as data files, without rewriting them.
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index c57f0d12..74b0225d 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -71,6 +71,7 @@ from pyiceberg.manifest import (
ManifestEntry,
ManifestEntryStatus,
ManifestFile,
+ PartitionFieldSummary,
write_manifest,
write_manifest_list,
)
@@ -3547,6 +3548,94 @@ class InspectTable:
schema=table_schema,
)
+ def manifests(self) -> "pa.Table":
+ import pyarrow as pa
+
+ from pyiceberg.conversions import from_bytes
+
+ partition_summary_schema = pa.struct([
+ pa.field("contains_null", pa.bool_(), nullable=False),
+ pa.field("contains_nan", pa.bool_(), nullable=True),
+ pa.field("lower_bound", pa.string(), nullable=True),
+ pa.field("upper_bound", pa.string(), nullable=True),
+ ])
+
+ manifest_schema = pa.schema([
+ pa.field('content', pa.int8(), nullable=False),
+ pa.field('path', pa.string(), nullable=False),
+ pa.field('length', pa.int64(), nullable=False),
+ pa.field('partition_spec_id', pa.int32(), nullable=False),
+ pa.field('added_snapshot_id', pa.int64(), nullable=False),
+ pa.field('added_data_files_count', pa.int32(), nullable=False),
+ pa.field('existing_data_files_count', pa.int32(), nullable=False),
+ pa.field('deleted_data_files_count', pa.int32(), nullable=False),
+ pa.field('added_delete_files_count', pa.int32(), nullable=False),
+ pa.field('existing_delete_files_count', pa.int32(),
nullable=False),
+ pa.field('deleted_delete_files_count', pa.int32(), nullable=False),
+ pa.field('partition_summaries',
pa.list_(partition_summary_schema), nullable=False),
+ ])
+
+ def _partition_summaries_to_rows(
+ spec: PartitionSpec, partition_summaries:
List[PartitionFieldSummary]
+ ) -> List[Dict[str, Any]]:
+ rows = []
+ for i, field_summary in enumerate(partition_summaries):
+ field = spec.fields[i]
+ partition_field_type =
spec.partition_type(self.tbl.schema()).fields[i].field_type
+ lower_bound = (
+ (
+ field.transform.to_human_string(
+ partition_field_type,
from_bytes(partition_field_type, field_summary.lower_bound)
+ )
+ )
+ if field_summary.lower_bound
+ else None
+ )
+ upper_bound = (
+ (
+ field.transform.to_human_string(
+ partition_field_type,
from_bytes(partition_field_type, field_summary.upper_bound)
+ )
+ )
+ if field_summary.upper_bound
+ else None
+ )
+ rows.append({
+ 'contains_null': field_summary.contains_null,
+ 'contains_nan': field_summary.contains_nan,
+ 'lower_bound': lower_bound,
+ 'upper_bound': upper_bound,
+ })
+ return rows
+
+ specs = self.tbl.metadata.specs()
+ manifests = []
+ if snapshot := self.tbl.metadata.current_snapshot():
+ for manifest in snapshot.manifests(self.tbl.io):
+ is_data_file = manifest.content == ManifestContent.DATA
+ is_delete_file = manifest.content == ManifestContent.DELETES
+ manifests.append({
+ 'content': manifest.content,
+ 'path': manifest.manifest_path,
+ 'length': manifest.manifest_length,
+ 'partition_spec_id': manifest.partition_spec_id,
+ 'added_snapshot_id': manifest.added_snapshot_id,
+ 'added_data_files_count': manifest.added_files_count if
is_data_file else 0,
+ 'existing_data_files_count': manifest.existing_files_count
if is_data_file else 0,
+ 'deleted_data_files_count': manifest.deleted_files_count
if is_data_file else 0,
+ 'added_delete_files_count': manifest.added_files_count if
is_delete_file else 0,
+ 'existing_delete_files_count':
manifest.existing_files_count if is_delete_file else 0,
+ 'deleted_delete_files_count': manifest.deleted_files_count
if is_delete_file else 0,
+ 'partition_summaries':
_partition_summaries_to_rows(specs[manifest.partition_spec_id],
manifest.partitions)
+ if manifest.partitions
+ else [],
+ })
+
+ return pa.Table.from_pylist(
+ manifests,
+ schema=manifest_schema,
+ )
+
@dataclass(frozen=True)
class TablePartition:
diff --git a/tests/integration/test_inspect_table.py
b/tests/integration/test_inspect_table.py
index a884f9d4..8665435e 100644
--- a/tests/integration/test_inspect_table.py
+++ b/tests/integration/test_inspect_table.py
@@ -445,3 +445,86 @@ def test_inspect_partitions_partitioned(spark:
SparkSession, session_catalog: Ca
df = tbl.inspect.partitions(snapshot_id=snapshot.snapshot_id)
spark_df = spark.sql(f"SELECT * FROM {identifier}.partitions VERSION
AS OF {snapshot.snapshot_id}")
check_pyiceberg_df_equals_spark_df(df, spark_df)
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_inspect_manifests(spark: SparkSession, session_catalog: Catalog,
format_version: int) -> None:
+ identifier = "default.table_metadata_manifests"
+ try:
+ session_catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ spark.sql(
+ f"""
+ CREATE TABLE {identifier} (
+ id int,
+ data string
+ )
+ PARTITIONED BY (data)
+ """
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO {identifier} VALUES (1, "a")
+ """
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO {identifier} VALUES (2, "b")
+ """
+ )
+
+ df = session_catalog.load_table(identifier).inspect.manifests()
+
+ assert df.column_names == [
+ 'content',
+ 'path',
+ 'length',
+ 'partition_spec_id',
+ 'added_snapshot_id',
+ 'added_data_files_count',
+ 'existing_data_files_count',
+ 'deleted_data_files_count',
+ 'added_delete_files_count',
+ 'existing_delete_files_count',
+ 'deleted_delete_files_count',
+ 'partition_summaries',
+ ]
+
+ int_cols = [
+ 'content',
+ 'length',
+ 'partition_spec_id',
+ 'added_snapshot_id',
+ 'added_data_files_count',
+ 'existing_data_files_count',
+ 'deleted_data_files_count',
+ 'added_delete_files_count',
+ 'existing_delete_files_count',
+ 'deleted_delete_files_count',
+ ]
+
+ for column in int_cols:
+ for value in df[column]:
+ assert isinstance(value.as_py(), int)
+
+ for value in df["path"]:
+ assert isinstance(value.as_py(), str)
+
+ for value in df["partition_summaries"]:
+ assert isinstance(value.as_py(), list)
+ for row in value:
+ assert isinstance(row["contains_null"].as_py(), bool)
+ assert isinstance(row["contains_nan"].as_py(), (bool, type(None)))
+ assert isinstance(row["lower_bound"].as_py(), (str, type(None)))
+ assert isinstance(row["upper_bound"].as_py(), (str, type(None)))
+
+ lhs = spark.table(f"{identifier}.manifests").toPandas()
+ rhs = df.to_pandas()
+ for column in df.column_names:
+ for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
+ assert left == right, f"Difference in column {column}: {left} !=
{right}"