This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new ee4dd920 Add entries metadata table (#551)
ee4dd920 is described below

commit ee4dd92008ffa21067837fe9fcdfaea063a514f3
Author: Fokko Driesprong <[email protected]>
AuthorDate: Thu Apr 4 20:46:23 2024 +0200

    Add entries metadata table (#551)
    
    * Add entries metadata table
    
    * lint
    
    * Revert typedef changes
    
    * Remove unrelated changes
    
    * Fix the CI
    
    * Add docs
---
 mkdocs/docs/api.md                      | 160 ++++++++++++++++++-
 pyiceberg/table/__init__.py             | 124 +++++++++++++++
 pyiceberg/table/metadata.py             |  27 +++-
 pyiceberg/utils/lazydict.py             |   4 +
 tests/conftest.py                       |   2 +-
 tests/integration/test_inspect_table.py | 268 ++++++++++++++++++++++++++++++++
 tests/integration/test_writes.py        |  28 ----
 7 files changed, 582 insertions(+), 31 deletions(-)

diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index c8620af7..15931d02 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -370,7 +370,165 @@ manifest_list: 
[["s3://warehouse/default/table_metadata_snapshots/metadata/snap-
 summary: 
[[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","54
 [...]
 ```
 
-### Add Files
+### Entries
+
+To show all the table's current manifest entries for both data and delete 
files.
+
+```python
+table.inspect.entries()
+```
+
+```
+pyarrow.Table
+status: int8 not null
+snapshot_id: int64 not null
+sequence_number: int64 not null
+file_sequence_number: int64 not null
+data_file: struct<content: int8 not null, file_path: string not null, 
file_format: string not null, partition: struct<> not null, record_count: int64 
not null, file_size_in_bytes: int64 not null, column_sizes: map<int32, int64>, 
value_counts: map<int32, int64>, null_value_counts: map<int32, int64>, 
nan_value_counts: map<int32, int64>, lower_bounds: map<int32, binary>, 
upper_bounds: map<int32, binary>, key_metadata: binary, split_offsets: 
list<item: int64>, equality_ids: list<item: int32> [...]
+  child 0, content: int8 not null
+  child 1, file_path: string not null
+  child 2, file_format: string not null
+  child 3, partition: struct<> not null
+  child 4, record_count: int64 not null
+  child 5, file_size_in_bytes: int64 not null
+  child 6, column_sizes: map<int32, int64>
+      child 0, entries: struct<key: int32 not null, value: int64> not null
+          child 0, key: int32 not null
+          child 1, value: int64
+  child 7, value_counts: map<int32, int64>
+      child 0, entries: struct<key: int32 not null, value: int64> not null
+          child 0, key: int32 not null
+          child 1, value: int64
+  child 8, null_value_counts: map<int32, int64>
+      child 0, entries: struct<key: int32 not null, value: int64> not null
+          child 0, key: int32 not null
+          child 1, value: int64
+  child 9, nan_value_counts: map<int32, int64>
+      child 0, entries: struct<key: int32 not null, value: int64> not null
+          child 0, key: int32 not null
+          child 1, value: int64
+  child 10, lower_bounds: map<int32, binary>
+      child 0, entries: struct<key: int32 not null, value: binary> not null
+          child 0, key: int32 not null
+          child 1, value: binary
+  child 11, upper_bounds: map<int32, binary>
+      child 0, entries: struct<key: int32 not null, value: binary> not null
+          child 0, key: int32 not null
+          child 1, value: binary
+  child 12, key_metadata: binary
+  child 13, split_offsets: list<item: int64>
+      child 0, item: int64
+  child 14, equality_ids: list<item: int32>
+      child 0, item: int32
+  child 15, sort_order_id: int32
+readable_metrics: struct<city: struct<column_size: int64, value_count: int64, 
null_value_count: int64, nan_value_count: int64, lower_bound: string, 
upper_bound: string> not null, lat: struct<column_size: int64, value_count: 
int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, 
upper_bound: double> not null, long: struct<column_size: int64, value_count: 
int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, 
upper_bound: double> not null>
+  child 0, city: struct<column_size: int64, value_count: int64, 
null_value_count: int64, nan_value_count: int64, lower_bound: string, 
upper_bound: string> not null
+      child 0, column_size: int64
+      child 1, value_count: int64
+      child 2, null_value_count: int64
+      child 3, nan_value_count: int64
+      child 4, lower_bound: string
+      child 5, upper_bound: string
+  child 1, lat: struct<column_size: int64, value_count: int64, 
null_value_count: int64, nan_value_count: int64, lower_bound: double, 
upper_bound: double> not null
+      child 0, column_size: int64
+      child 1, value_count: int64
+      child 2, null_value_count: int64
+      child 3, nan_value_count: int64
+      child 4, lower_bound: double
+      child 5, upper_bound: double
+  child 2, long: struct<column_size: int64, value_count: int64, 
null_value_count: int64, nan_value_count: int64, lower_bound: double, 
upper_bound: double> not null
+      child 0, column_size: int64
+      child 1, value_count: int64
+      child 2, null_value_count: int64
+      child 3, nan_value_count: int64
+      child 4, lower_bound: double
+      child 5, upper_bound: double
+----
+status: [[1]]
+snapshot_id: [[6245626162224016531]]
+sequence_number: [[1]]
+file_sequence_number: [[1]]
+data_file: [
+  -- is_valid: all not null
+  -- child 0 type: int8
+[0]
+  -- child 1 type: string
+["s3://warehouse/default/cities/data/00000-0-80766b66-e558-4150-a5cf-85e4c609b9fe.parquet"]
+  -- child 2 type: string
+["PARQUET"]
+  -- child 3 type: struct<>
+    -- is_valid: all not null
+  -- child 4 type: int64
+[4]
+  -- child 5 type: int64
+[1656]
+  -- child 6 type: map<int32, int64>
+[keys:[1,2,3]values:[140,135,135]]
+  -- child 7 type: map<int32, int64>
+[keys:[1,2,3]values:[4,4,4]]
+  -- child 8 type: map<int32, int64>
+[keys:[1,2,3]values:[0,0,0]]
+  -- child 9 type: map<int32, int64>
+[keys:[]values:[]]
+  -- child 10 type: map<int32, binary>
+[keys:[1,2,3]values:[416D7374657264616D,8602B68311E34240,3A77BB5E9A9B5EC0]]
+  -- child 11 type: map<int32, binary>
+[keys:[1,2,3]values:[53616E204672616E636973636F,F5BEF1B5678E4A40,304CA60A46651840]]
+  -- child 12 type: binary
+[null]
+  -- child 13 type: list<item: int64>
+[[4]]
+  -- child 14 type: list<item: int32>
+[null]
+  -- child 15 type: int32
+[null]]
+readable_metrics: [
+  -- is_valid: all not null
+  -- child 0 type: struct<column_size: int64, value_count: int64, 
null_value_count: int64, nan_value_count: int64, lower_bound: string, 
upper_bound: string>
+    -- is_valid: all not null
+    -- child 0 type: int64
+[140]
+    -- child 1 type: int64
+[4]
+    -- child 2 type: int64
+[0]
+    -- child 3 type: int64
+[null]
+    -- child 4 type: string
+["Amsterdam"]
+    -- child 5 type: string
+["San Francisco"]
+  -- child 1 type: struct<column_size: int64, value_count: int64, 
null_value_count: int64, nan_value_count: int64, lower_bound: double, 
upper_bound: double>
+    -- is_valid: all not null
+    -- child 0 type: int64
+[135]
+    -- child 1 type: int64
+[4]
+    -- child 2 type: int64
+[0]
+    -- child 3 type: int64
+[null]
+    -- child 4 type: double
+[37.773972]
+    -- child 5 type: double
+[53.11254]
+  -- child 2 type: struct<column_size: int64, value_count: int64, 
null_value_count: int64, nan_value_count: int64, lower_bound: double, 
upper_bound: double>
+    -- is_valid: all not null
+    -- child 0 type: int64
+[135]
+    -- child 1 type: int64
+[4]
+    -- child 2 type: int64
+[0]
+    -- child 3 type: int64
+[null]
+    -- child 4 type: double
+[-122.431297]
+    -- child 5 type: double
+[6.0989]]
+```
+
+## Add Files
 
 Expert Iceberg users may choose to commit existing parquet files to the 
Iceberg table as data files, without rewriting them.
 
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 0f113f3b..e183d827 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -47,6 +47,7 @@ from sortedcontainers import SortedList
 from typing_extensions import Annotated
 
 import pyiceberg.expressions.parser as parser
+from pyiceberg.conversions import from_bytes
 from pyiceberg.exceptions import CommitFailedException, ResolveError, 
ValidationError
 from pyiceberg.expressions import (
     AlwaysTrue,
@@ -3264,3 +3265,126 @@ class InspectTable:
             snapshots,
             schema=snapshots_schema,
         )
+
+    def entries(self) -> "pa.Table":
+        import pyarrow as pa
+
+        from pyiceberg.io.pyarrow import schema_to_pyarrow
+
+        schema = self.tbl.metadata.schema()
+
+        readable_metrics_struct = []
+
+        def _readable_metrics_struct(bound_type: PrimitiveType) -> 
pa.StructType:
+            pa_bound_type = schema_to_pyarrow(bound_type)
+            return pa.struct([
+                pa.field("column_size", pa.int64(), nullable=True),
+                pa.field("value_count", pa.int64(), nullable=True),
+                pa.field("null_value_count", pa.int64(), nullable=True),
+                pa.field("nan_value_count", pa.int64(), nullable=True),
+                pa.field("lower_bound", pa_bound_type, nullable=True),
+                pa.field("upper_bound", pa_bound_type, nullable=True),
+            ])
+
+        for field in self.tbl.metadata.schema().fields:
+            readable_metrics_struct.append(
+                pa.field(schema.find_column_name(field.field_id), 
_readable_metrics_struct(field.field_type), nullable=False)
+            )
+
+        partition_record = self.tbl.metadata.specs_struct()
+        pa_record_struct = schema_to_pyarrow(partition_record)
+
+        entries_schema = pa.schema([
+            pa.field('status', pa.int8(), nullable=False),
+            pa.field('snapshot_id', pa.int64(), nullable=False),
+            pa.field('sequence_number', pa.int64(), nullable=False),
+            pa.field('file_sequence_number', pa.int64(), nullable=False),
+            pa.field(
+                'data_file',
+                pa.struct([
+                    pa.field('content', pa.int8(), nullable=False),
+                    pa.field('file_path', pa.string(), nullable=False),
+                    pa.field('file_format', pa.string(), nullable=False),
+                    pa.field('partition', pa_record_struct, nullable=False),
+                    pa.field('record_count', pa.int64(), nullable=False),
+                    pa.field('file_size_in_bytes', pa.int64(), nullable=False),
+                    pa.field('column_sizes', pa.map_(pa.int32(), pa.int64()), 
nullable=True),
+                    pa.field('value_counts', pa.map_(pa.int32(), pa.int64()), 
nullable=True),
+                    pa.field('null_value_counts', pa.map_(pa.int32(), 
pa.int64()), nullable=True),
+                    pa.field('nan_value_counts', pa.map_(pa.int32(), 
pa.int64()), nullable=True),
+                    pa.field('lower_bounds', pa.map_(pa.int32(), pa.binary()), 
nullable=True),
+                    pa.field('upper_bounds', pa.map_(pa.int32(), pa.binary()), 
nullable=True),
+                    pa.field('key_metadata', pa.binary(), nullable=True),
+                    pa.field('split_offsets', pa.list_(pa.int64()), 
nullable=True),
+                    pa.field('equality_ids', pa.list_(pa.int32()), 
nullable=True),
+                    pa.field('sort_order_id', pa.int32(), nullable=True),
+                ]),
+                nullable=False,
+            ),
+            pa.field('readable_metrics', pa.struct(readable_metrics_struct), 
nullable=True),
+        ])
+
+        entries = []
+        if snapshot := self.tbl.metadata.current_snapshot():
+            for manifest in snapshot.manifests(self.tbl.io):
+                for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
+                    column_sizes = entry.data_file.column_sizes or {}
+                    value_counts = entry.data_file.value_counts or {}
+                    null_value_counts = entry.data_file.null_value_counts or {}
+                    nan_value_counts = entry.data_file.nan_value_counts or {}
+                    lower_bounds = entry.data_file.lower_bounds or {}
+                    upper_bounds = entry.data_file.upper_bounds or {}
+                    readable_metrics = {
+                        schema.find_column_name(field.field_id): {
+                            "column_size": column_sizes.get(field.field_id),
+                            "value_count": value_counts.get(field.field_id),
+                            "null_value_count": 
null_value_counts.get(field.field_id),
+                            "nan_value_count": 
nan_value_counts.get(field.field_id),
+                            # Makes them readable
+                            "lower_bound": from_bytes(field.field_type, 
lower_bound)
+                            if (lower_bound := 
lower_bounds.get(field.field_id))
+                            else None,
+                            "upper_bound": from_bytes(field.field_type, 
upper_bound)
+                            if (upper_bound := 
upper_bounds.get(field.field_id))
+                            else None,
+                        }
+                        for field in self.tbl.metadata.schema().fields
+                    }
+
+                    partition = entry.data_file.partition
+                    partition_record_dict = {
+                        field.name: partition[pos]
+                        for pos, field in 
enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
+                    }
+
+                    entries.append({
+                        'status': entry.status.value,
+                        'snapshot_id': entry.snapshot_id,
+                        'sequence_number': entry.data_sequence_number,
+                        'file_sequence_number': entry.file_sequence_number,
+                        'data_file': {
+                            "content": entry.data_file.content,
+                            "file_path": entry.data_file.file_path,
+                            "file_format": entry.data_file.file_format,
+                            "partition": partition_record_dict,
+                            "record_count": entry.data_file.record_count,
+                            "file_size_in_bytes": 
entry.data_file.file_size_in_bytes,
+                            "column_sizes": dict(entry.data_file.column_sizes),
+                            "value_counts": dict(entry.data_file.value_counts),
+                            "null_value_counts": 
dict(entry.data_file.null_value_counts),
+                            "nan_value_counts": 
entry.data_file.nan_value_counts,
+                            "lower_bounds": entry.data_file.lower_bounds,
+                            "upper_bounds": entry.data_file.upper_bounds,
+                            "key_metadata": entry.data_file.key_metadata,
+                            "split_offsets": entry.data_file.split_offsets,
+                            "equality_ids": entry.data_file.equality_ids,
+                            "sort_order_id": entry.data_file.sort_order_id,
+                            "spec_id": entry.data_file.spec_id,
+                        },
+                        'readable_metrics': readable_metrics,
+                    })
+
+        return pa.Table.from_pylist(
+            entries,
+            schema=entries_schema,
+        )
diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py
index 2e20c509..21ed1447 100644
--- a/pyiceberg/table/metadata.py
+++ b/pyiceberg/table/metadata.py
@@ -49,7 +49,7 @@ from pyiceberg.typedef import (
     IcebergRootModel,
     Properties,
 )
-from pyiceberg.types import transform_dict_value_to_str
+from pyiceberg.types import NestedField, StructType, 
transform_dict_value_to_str
 from pyiceberg.utils.config import Config
 from pyiceberg.utils.datetime import datetime_to_millis
 
@@ -245,6 +245,31 @@ class TableMetadataCommonFields(IcebergBaseModel):
         """Return a dict the partition specs this table."""
         return {spec.spec_id: spec for spec in self.partition_specs}
 
+    def specs_struct(self) -> StructType:
+        """Produce a struct of all the combined PartitionSpecs.
+
+        The partition fields should be optional: Partition fields may be added 
later,
+        in which case not all files would have the result field, and it may be 
null.
+
+        :return: A StructType that represents all the combined PartitionSpecs 
of the table
+        """
+        specs = self.specs()
+
+        # Collect all the fields
+        struct_fields = {field.field_id: field for spec in specs.values() for 
field in spec.fields}
+
+        schema = self.schema()
+
+        nested_fields = []
+        # Sort them by field_id in order to get a deterministic output
+        for field_id in sorted(struct_fields):
+            field = struct_fields[field_id]
+            source_type = schema.find_type(field.source_id)
+            result_type = field.transform.result_type(source_type)
+            nested_fields.append(NestedField(field_id=field.field_id, 
name=field.name, type=result_type, required=False))
+
+        return StructType(*nested_fields)
+
     def new_snapshot_id(self) -> int:
         """Generate a new snapshot-id that's not in use."""
         snapshot_id = _generate_snapshot_id()
diff --git a/pyiceberg/utils/lazydict.py b/pyiceberg/utils/lazydict.py
index dfe251c0..ea70c78c 100644
--- a/pyiceberg/utils/lazydict.py
+++ b/pyiceberg/utils/lazydict.py
@@ -66,3 +66,7 @@ class LazyDict(Mapping[K, V]):
         """Return the number of items in the dictionary."""
         source = self._dict or self._build_dict()
         return len(source)
+
+    def __dict__(self) -> Dict[K, V]:  # type: ignore
+        """Convert the lazy dict in a dict."""
+        return self._dict or self._build_dict()
diff --git a/tests/conftest.py b/tests/conftest.py
index 8c381a1c..aa09517b 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -2043,5 +2043,5 @@ def pa_schema() -> "pa.Schema":
 def arrow_table_with_null(pa_schema: "pa.Schema") -> "pa.Table":
     import pyarrow as pa
 
-    """PyArrow table with all kinds of columns"""
+    """Pyarrow table with all kinds of columns."""
     return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)
diff --git a/tests/integration/test_inspect_table.py 
b/tests/integration/test_inspect_table.py
new file mode 100644
index 00000000..f2515cae
--- /dev/null
+++ b/tests/integration/test_inspect_table.py
@@ -0,0 +1,268 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+
+import math
+from datetime import date, datetime
+
+import pyarrow as pa
+import pytest
+import pytz
+from pyspark.sql import SparkSession
+
+from pyiceberg.catalog import Catalog
+from pyiceberg.exceptions import NoSuchTableError
+from pyiceberg.schema import Schema
+from pyiceberg.table import Table
+from pyiceberg.typedef import Properties
+from pyiceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DoubleType,
+    FixedType,
+    FloatType,
+    IntegerType,
+    LongType,
+    NestedField,
+    StringType,
+    TimestampType,
+    TimestamptzType,
+)
+
+TABLE_SCHEMA = Schema(
+    NestedField(field_id=1, name="bool", field_type=BooleanType(), 
required=False),
+    NestedField(field_id=2, name="string", field_type=StringType(), 
required=False),
+    NestedField(field_id=3, name="string_long", field_type=StringType(), 
required=False),
+    NestedField(field_id=4, name="int", field_type=IntegerType(), 
required=False),
+    NestedField(field_id=5, name="long", field_type=LongType(), 
required=False),
+    NestedField(field_id=6, name="float", field_type=FloatType(), 
required=False),
+    NestedField(field_id=7, name="double", field_type=DoubleType(), 
required=False),
+    NestedField(field_id=8, name="timestamp", field_type=TimestampType(), 
required=False),
+    NestedField(field_id=9, name="timestamptz", field_type=TimestamptzType(), 
required=False),
+    NestedField(field_id=10, name="date", field_type=DateType(), 
required=False),
+    # NestedField(field_id=11, name="time", field_type=TimeType(), 
required=False),
+    # NestedField(field_id=12, name="uuid", field_type=UuidType(), 
required=False),
+    NestedField(field_id=12, name="binary", field_type=BinaryType(), 
required=False),
+    NestedField(field_id=13, name="fixed", field_type=FixedType(16), 
required=False),
+)
+
+
+def _create_table(session_catalog: Catalog, identifier: str, properties: 
Properties) -> Table:
+    try:
+        session_catalog.drop_table(identifier=identifier)
+    except NoSuchTableError:
+        pass
+
+    return session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties=properties)
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_inspect_snapshots(
+    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: 
pa.Table, format_version: int
+) -> None:
+    identifier = "default.table_metadata_snapshots"
+    tbl = _create_table(session_catalog, identifier, 
properties={"format-version": format_version})
+
+    tbl.overwrite(arrow_table_with_null)
+    # should produce a DELETE entry
+    tbl.overwrite(arrow_table_with_null)
+    # Since we don't rewrite, this should produce a new manifest with an ADDED 
entry
+    tbl.append(arrow_table_with_null)
+
+    df = tbl.inspect.snapshots()
+
+    assert df.column_names == [
+        'committed_at',
+        'snapshot_id',
+        'parent_id',
+        'operation',
+        'manifest_list',
+        'summary',
+    ]
+
+    for committed_at in df['committed_at']:
+        assert isinstance(committed_at.as_py(), datetime)
+
+    for snapshot_id in df['snapshot_id']:
+        assert isinstance(snapshot_id.as_py(), int)
+
+    assert df['parent_id'][0].as_py() is None
+    assert df['parent_id'][1:] == df['snapshot_id'][:2]
+
+    assert [operation.as_py() for operation in df['operation']] == ['append', 
'overwrite', 'append']
+
+    for manifest_list in df['manifest_list']:
+        assert manifest_list.as_py().startswith("s3://")
+
+    assert df['summary'][0].as_py() == [
+        ('added-files-size', '5459'),
+        ('added-data-files', '1'),
+        ('added-records', '3'),
+        ('total-data-files', '1'),
+        ('total-delete-files', '0'),
+        ('total-records', '3'),
+        ('total-files-size', '5459'),
+        ('total-position-deletes', '0'),
+        ('total-equality-deletes', '0'),
+    ]
+
+    lhs = spark.table(f"{identifier}.snapshots").toPandas()
+    rhs = df.to_pandas()
+    for column in df.column_names:
+        for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
+            if column == 'summary':
+                # Arrow returns a list of tuples, instead of a dict
+                right = dict(right)
+
+            if isinstance(left, float) and math.isnan(left) and 
isinstance(right, float) and math.isnan(right):
+                # NaN != NaN in Python
+                continue
+
+            assert left == right, f"Difference in column {column}: {left} != 
{right}"
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_inspect_entries(
+    spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: 
pa.Table, format_version: int
+) -> None:
+    identifier = "default.table_metadata_entries"
+    tbl = _create_table(session_catalog, identifier, 
properties={"format-version": format_version})
+
+    # Write some data
+    tbl.append(arrow_table_with_null)
+
+    df = tbl.inspect.entries()
+
+    assert df.column_names == [
+        'status',
+        'snapshot_id',
+        'sequence_number',
+        'file_sequence_number',
+        'data_file',
+        'readable_metrics',
+    ]
+
+    # Make sure that they are filled properly
+    for int_column in ['status', 'snapshot_id', 'sequence_number', 
'file_sequence_number']:
+        for value in df[int_column]:
+            assert isinstance(value.as_py(), int)
+
+    for snapshot_id in df['snapshot_id']:
+        assert isinstance(snapshot_id.as_py(), int)
+
+    lhs = df.to_pandas()
+    rhs = spark.table(f"{identifier}.entries").toPandas()
+    for column in df.column_names:
+        for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
+            if column == 'data_file':
+                right = right.asDict(recursive=True)
+                for df_column in left.keys():
+                    if df_column == 'partition':
+                        # Spark leaves out the partition if the table is 
unpartitioned
+                        continue
+
+                    df_lhs = left[df_column]
+                    df_rhs = right[df_column]
+                    if isinstance(df_rhs, dict):
+                        # Arrow turns dicts into lists of tuple
+                        df_lhs = dict(df_lhs)
+
+                    assert df_lhs == df_rhs, f"Difference in data_file column 
{df_column}: {df_lhs} != {df_rhs}"
+            elif column == 'readable_metrics':
+                right = right.asDict(recursive=True)
+
+                assert list(left.keys()) == [
+                    'bool',
+                    'string',
+                    'string_long',
+                    'int',
+                    'long',
+                    'float',
+                    'double',
+                    'timestamp',
+                    'timestamptz',
+                    'date',
+                    'binary',
+                    'fixed',
+                ]
+
+                assert left.keys() == right.keys()
+
+                for rm_column in left.keys():
+                    rm_lhs = left[rm_column]
+                    rm_rhs = right[rm_column]
+
+                    assert rm_lhs['column_size'] == rm_rhs['column_size']
+                    assert rm_lhs['value_count'] == rm_rhs['value_count']
+                    assert rm_lhs['null_value_count'] == 
rm_rhs['null_value_count']
+                    assert rm_lhs['nan_value_count'] == 
rm_rhs['nan_value_count']
+
+                    if rm_column == 'timestamptz':
+                        # PySpark does not correctly set the timstamptz
+                        rm_rhs['lower_bound'] = 
rm_rhs['lower_bound'].replace(tzinfo=pytz.utc)
+                        rm_rhs['upper_bound'] = 
rm_rhs['upper_bound'].replace(tzinfo=pytz.utc)
+
+                    assert rm_lhs['lower_bound'] == rm_rhs['lower_bound']
+                    assert rm_lhs['upper_bound'] == rm_rhs['upper_bound']
+            else:
+                assert left == right, f"Difference in column {column}: {left} 
!= {right}"
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_inspect_entries_partitioned(spark: SparkSession, session_catalog: 
Catalog, format_version: int) -> None:
+    identifier = "default.table_metadata_entries_partitioned"
+    try:
+        session_catalog.drop_table(identifier=identifier)
+    except NoSuchTableError:
+        pass
+
+    spark.sql(
+        f"""
+        CREATE TABLE {identifier} (
+            dt date
+        )
+        PARTITIONED BY (months(dt))
+    """
+    )
+
+    spark.sql(
+        f"""
+        INSERT INTO {identifier} VALUES (CAST('2021-01-01' AS date))
+    """
+    )
+
+    spark.sql(
+        f"""
+        ALTER TABLE {identifier}
+        REPLACE PARTITION FIELD dt_month WITH days(dt)
+    """
+    )
+
+    spark.sql(
+        f"""
+        INSERT INTO {identifier} VALUES (CAST('2021-02-01' AS date))
+    """
+    )
+
+    df = session_catalog.load_table(identifier).inspect.entries()
+
+    assert df.to_pydict()['data_file'][0]['partition'] == {'dt_day': 
date(2021, 2, 1), 'dt_month': None}
+    assert df.to_pydict()['data_file'][1]['partition'] == {'dt_day': None, 
'dt_month': 612}
diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py
index 77567023..e950fb43 100644
--- a/tests/integration/test_writes.py
+++ b/tests/integration/test_writes.py
@@ -96,34 +96,6 @@ TABLE_SCHEMA = Schema(
 )
 
 
[email protected](scope="session")
-def pa_schema() -> pa.Schema:
-    return pa.schema([
-        ("bool", pa.bool_()),
-        ("string", pa.string()),
-        ("string_long", pa.string()),
-        ("int", pa.int32()),
-        ("long", pa.int64()),
-        ("float", pa.float32()),
-        ("double", pa.float64()),
-        ("timestamp", pa.timestamp(unit="us")),
-        ("timestamptz", pa.timestamp(unit="us", tz="UTC")),
-        ("date", pa.date32()),
-        # Not supported by Spark
-        # ("time", pa.time64("us")),
-        # Not natively supported by Arrow
-        # ("uuid", pa.fixed(16)),
-        ("binary", pa.large_binary()),
-        ("fixed", pa.binary(16)),
-    ])
-
-
[email protected](scope="session")
-def arrow_table_with_null(pa_schema: pa.Schema) -> pa.Table:
-    """PyArrow table with all kinds of columns"""
-    return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)
-
-
 @pytest.fixture(scope="session")
 def arrow_table_without_data(pa_schema: pa.Schema) -> pa.Table:
     """PyArrow table with all kinds of columns"""

Reply via email to