This is an automated email from the ASF dual-hosted git repository.
sungwy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 41a3c8ea Add metadata tables for `data_files` and `delete_files`
(#1066)
41a3c8ea is described below
commit 41a3c8ea4e9c3592331755970bfc38f52eaf3fd8
Author: Soumya Ghosh <[email protected]>
AuthorDate: Fri Sep 20 05:51:57 2024 +0530
Add metadata tables for `data_files` and `delete_files` (#1066)
* Add metadata tables for data_files and delete_files
* Update API docs for `data_files` and `delete_files`
* Update mehtod signature of `_files()`
* Migrate implementation of files() table from __init__.py
---
mkdocs/docs/api.md | 5 +
pyiceberg/table/inspect.py | 27 +++-
tests/integration/test_inspect_table.py | 268 +++++++++++++++++---------------
3 files changed, 169 insertions(+), 131 deletions(-)
diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 53a7846b..eaffb84a 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -845,6 +845,11 @@ readable_metrics: [
[6.0989]]
```
+!!! info
+ Content refers to type of content stored by the data file: `0` - `Data`,
`1` - `Position Deletes`, `2` - `Equality Deletes`
+
+To show only data files or delete files in the current snapshot, use
`table.inspect.data_files()` and `table.inspect.delete_files()` respectively.
+
## Add Files
Expert Iceberg users may choose to commit existing parquet files to the
Iceberg table as data files, without rewriting them.
diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py
index 3f64255e..470c00f4 100644
--- a/pyiceberg/table/inspect.py
+++ b/pyiceberg/table/inspect.py
@@ -17,7 +17,7 @@
from __future__ import annotations
from datetime import datetime
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
from pyiceberg.conversions import from_bytes
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent,
PartitionFieldSummary
@@ -473,7 +473,7 @@ class InspectTable:
return pa.Table.from_pylist(history, schema=history_schema)
- def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
+ def _files(self, snapshot_id: Optional[int] = None, data_file_filter:
Optional[Set[DataFileContent]] = None) -> "pa.Table":
import pyarrow as pa
from pyiceberg.io.pyarrow import schema_to_pyarrow
@@ -530,6 +530,8 @@ class InspectTable:
for manifest_list in snapshot.manifests(io):
for manifest_entry in manifest_list.fetch_manifest_entry(io):
data_file = manifest_entry.data_file
+ if data_file_filter and data_file.content not in
data_file_filter:
+ continue
column_sizes = data_file.column_sizes or {}
value_counts = data_file.value_counts or {}
null_value_counts = data_file.null_value_counts or {}
@@ -558,12 +560,12 @@ class InspectTable:
"spec_id": data_file.spec_id,
"record_count": data_file.record_count,
"file_size_in_bytes": data_file.file_size_in_bytes,
- "column_sizes": dict(data_file.column_sizes),
- "value_counts": dict(data_file.value_counts),
- "null_value_counts": dict(data_file.null_value_counts),
- "nan_value_counts": dict(data_file.nan_value_counts),
- "lower_bounds": dict(data_file.lower_bounds),
- "upper_bounds": dict(data_file.upper_bounds),
+ "column_sizes": dict(data_file.column_sizes) if
data_file.column_sizes is not None else None,
+ "value_counts": dict(data_file.value_counts) if
data_file.value_counts is not None else None,
+ "null_value_counts": dict(data_file.null_value_counts) if
data_file.null_value_counts is not None else None,
+ "nan_value_counts": dict(data_file.nan_value_counts) if
data_file.nan_value_counts is not None else None,
+ "lower_bounds": dict(data_file.lower_bounds) if
data_file.lower_bounds is not None else None,
+ "upper_bounds": dict(data_file.upper_bounds) if
data_file.upper_bounds is not None else None,
"key_metadata": data_file.key_metadata,
"split_offsets": data_file.split_offsets,
"equality_ids": data_file.equality_ids,
@@ -575,3 +577,12 @@ class InspectTable:
files,
schema=files_schema,
)
+
+ def files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
+ return self._files(snapshot_id)
+
+ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
+ return self._files(snapshot_id, {DataFileContent.DATA})
+
+ def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
+ return self._files(snapshot_id, {DataFileContent.POSITION_DELETES,
DataFileContent.EQUALITY_DELETES})
diff --git a/tests/integration/test_inspect_table.py
b/tests/integration/test_inspect_table.py
index 9f632258..68b10f32 100644
--- a/tests/integration/test_inspect_table.py
+++ b/tests/integration/test_inspect_table.py
@@ -672,126 +672,141 @@ def test_inspect_files(
# append more data
tbl.append(arrow_table_with_null)
- df = tbl.refresh().inspect.files()
+ # configure table properties
+ if format_version == 2:
+ with tbl.transaction() as txn:
+ txn.set_properties({"write.delete.mode": "merge-on-read"})
+ spark.sql(f"DELETE FROM {identifier} WHERE int = 1")
- assert df.column_names == [
- "content",
- "file_path",
- "file_format",
- "spec_id",
- "record_count",
- "file_size_in_bytes",
- "column_sizes",
- "value_counts",
- "null_value_counts",
- "nan_value_counts",
- "lower_bounds",
- "upper_bounds",
- "key_metadata",
- "split_offsets",
- "equality_ids",
- "sort_order_id",
- "readable_metrics",
- ]
-
- # make sure the non-nullable fields are filled
- for int_column in ["content", "spec_id", "record_count",
"file_size_in_bytes"]:
- for value in df[int_column]:
- assert isinstance(value.as_py(), int)
-
- for split_offsets in df["split_offsets"]:
- assert isinstance(split_offsets.as_py(), list)
-
- for file_format in df["file_format"]:
- assert file_format.as_py() == "PARQUET"
+ files_df = tbl.refresh().inspect.files()
- for file_path in df["file_path"]:
- assert file_path.as_py().startswith("s3://")
+ data_files_df = tbl.inspect.data_files()
- lhs = df.to_pandas()
- rhs = spark.table(f"{identifier}.files").toPandas()
+ delete_files_df = tbl.inspect.delete_files()
- lhs_subset = lhs[
- [
+ def inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None:
+ assert df.column_names == [
"content",
"file_path",
"file_format",
"spec_id",
"record_count",
"file_size_in_bytes",
+ "column_sizes",
+ "value_counts",
+ "null_value_counts",
+ "nan_value_counts",
+ "lower_bounds",
+ "upper_bounds",
+ "key_metadata",
"split_offsets",
"equality_ids",
"sort_order_id",
+ "readable_metrics",
]
- ]
- rhs_subset = rhs[
- [
- "content",
- "file_path",
- "file_format",
- "spec_id",
- "record_count",
- "file_size_in_bytes",
- "split_offsets",
- "equality_ids",
- "sort_order_id",
+
+ # make sure the non-nullable fields are filled
+ for int_column in ["content", "spec_id", "record_count",
"file_size_in_bytes"]:
+ for value in df[int_column]:
+ assert isinstance(value.as_py(), int)
+
+ for split_offsets in df["split_offsets"]:
+ assert isinstance(split_offsets.as_py(), list)
+
+ for file_format in df["file_format"]:
+ assert file_format.as_py() == "PARQUET"
+
+ for file_path in df["file_path"]:
+ assert file_path.as_py().startswith("s3://")
+
+ lhs = df.to_pandas()
+ rhs = spark_df.toPandas()
+
+ lhs_subset = lhs[
+ [
+ "content",
+ "file_path",
+ "file_format",
+ "spec_id",
+ "record_count",
+ "file_size_in_bytes",
+ "split_offsets",
+ "equality_ids",
+ "sort_order_id",
+ ]
+ ]
+ rhs_subset = rhs[
+ [
+ "content",
+ "file_path",
+ "file_format",
+ "spec_id",
+ "record_count",
+ "file_size_in_bytes",
+ "split_offsets",
+ "equality_ids",
+ "sort_order_id",
+ ]
]
- ]
- assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False,
check_categorical=False)
+ assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False,
check_categorical=False)
- for column in df.column_names:
- for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
- if isinstance(left, float) and math.isnan(left) and
isinstance(right, float) and math.isnan(right):
- # NaN != NaN in Python
- continue
- if column in [
- "column_sizes",
- "value_counts",
- "null_value_counts",
- "nan_value_counts",
- "lower_bounds",
- "upper_bounds",
- ]:
- if isinstance(right, dict):
- left = dict(left)
- assert left == right, f"Difference in column {column}: {left}
!= {right}"
+ for column in df.column_names:
+ for left, right in zip(lhs[column].to_list(),
rhs[column].to_list()):
+ if isinstance(left, float) and math.isnan(left) and
isinstance(right, float) and math.isnan(right):
+ # NaN != NaN in Python
+ continue
+ if column in [
+ "column_sizes",
+ "value_counts",
+ "null_value_counts",
+ "nan_value_counts",
+ "lower_bounds",
+ "upper_bounds",
+ ]:
+ if isinstance(right, dict):
+ left = dict(left)
+ assert left == right, f"Difference in column {column}:
{left} != {right}"
- elif column == "readable_metrics":
- assert list(left.keys()) == [
- "bool",
- "string",
- "string_long",
- "int",
- "long",
- "float",
- "double",
- "timestamp",
- "timestamptz",
- "date",
- "binary",
- "fixed",
- ]
- assert left.keys() == right.keys()
-
- for rm_column in left.keys():
- rm_lhs = left[rm_column]
- rm_rhs = right[rm_column]
-
- assert rm_lhs["column_size"] == rm_rhs["column_size"]
- assert rm_lhs["value_count"] == rm_rhs["value_count"]
- assert rm_lhs["null_value_count"] ==
rm_rhs["null_value_count"]
- assert rm_lhs["nan_value_count"] ==
rm_rhs["nan_value_count"]
-
- if rm_column == "timestamptz":
- # PySpark does not correctly set the timstamptz
- rm_rhs["lower_bound"] =
rm_rhs["lower_bound"].replace(tzinfo=pytz.utc)
- rm_rhs["upper_bound"] =
rm_rhs["upper_bound"].replace(tzinfo=pytz.utc)
-
- assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"]
- assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"]
- else:
- assert left == right, f"Difference in column {column}: {left}
!= {right}"
+ elif column == "readable_metrics":
+ assert list(left.keys()) == [
+ "bool",
+ "string",
+ "string_long",
+ "int",
+ "long",
+ "float",
+ "double",
+ "timestamp",
+ "timestamptz",
+ "date",
+ "binary",
+ "fixed",
+ ]
+ assert left.keys() == right.keys()
+
+ for rm_column in left.keys():
+ rm_lhs = left[rm_column]
+ rm_rhs = right[rm_column]
+
+ assert rm_lhs["column_size"] == rm_rhs["column_size"]
+ assert rm_lhs["value_count"] == rm_rhs["value_count"]
+ assert rm_lhs["null_value_count"] ==
rm_rhs["null_value_count"]
+ assert rm_lhs["nan_value_count"] ==
rm_rhs["nan_value_count"]
+
+ if rm_column == "timestamptz" and
rm_rhs["lower_bound"] and rm_rhs["upper_bound"]:
+ # PySpark does not correctly set the timstamptz
+ rm_rhs["lower_bound"] =
rm_rhs["lower_bound"].replace(tzinfo=pytz.utc)
+ rm_rhs["upper_bound"] =
rm_rhs["upper_bound"].replace(tzinfo=pytz.utc)
+
+ assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"]
+ assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"]
+ else:
+ assert left == right, f"Difference in column {column}:
{left} != {right}"
+
+ inspect_files_asserts(files_df, spark.table(f"{identifier}.files"))
+ inspect_files_asserts(data_files_df,
spark.table(f"{identifier}.data_files"))
+ inspect_files_asserts(delete_files_df,
spark.table(f"{identifier}.delete_files"))
@pytest.mark.integration
@@ -801,26 +816,33 @@ def test_inspect_files_no_snapshot(spark: SparkSession,
session_catalog: Catalog
tbl = _create_table(session_catalog, identifier,
properties={"format-version": format_version})
- df = tbl.refresh().inspect.files()
+ files_df = tbl.refresh().inspect.files()
+ data_files_df = tbl.inspect.data_files()
+ delete_files_df = tbl.inspect.delete_files()
- assert df.column_names == [
- "content",
- "file_path",
- "file_format",
- "spec_id",
- "record_count",
- "file_size_in_bytes",
- "column_sizes",
- "value_counts",
- "null_value_counts",
- "nan_value_counts",
- "lower_bounds",
- "upper_bounds",
- "key_metadata",
- "split_offsets",
- "equality_ids",
- "sort_order_id",
- "readable_metrics",
- ]
+ def inspect_files_asserts(df: pa.Table) -> None:
+ assert df.column_names == [
+ "content",
+ "file_path",
+ "file_format",
+ "spec_id",
+ "record_count",
+ "file_size_in_bytes",
+ "column_sizes",
+ "value_counts",
+ "null_value_counts",
+ "nan_value_counts",
+ "lower_bounds",
+ "upper_bounds",
+ "key_metadata",
+ "split_offsets",
+ "equality_ids",
+ "sort_order_id",
+ "readable_metrics",
+ ]
+
+ assert df.to_pandas().empty is True
- assert df.to_pandas().empty is True
+ inspect_files_asserts(files_df)
+ inspect_files_asserts(data_files_df)
+ inspect_files_asserts(delete_files_df)