This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new ee4dd920 Add entries metadata table (#551)
ee4dd920 is described below
commit ee4dd92008ffa21067837fe9fcdfaea063a514f3
Author: Fokko Driesprong <[email protected]>
AuthorDate: Thu Apr 4 20:46:23 2024 +0200
Add entries metadata table (#551)
* Add entries metadata table
* lint
* Revert typedef changes
* Remove unrelated changes
* Fix the CI
* Add docs
---
mkdocs/docs/api.md | 160 ++++++++++++++++++-
pyiceberg/table/__init__.py | 124 +++++++++++++++
pyiceberg/table/metadata.py | 27 +++-
pyiceberg/utils/lazydict.py | 4 +
tests/conftest.py | 2 +-
tests/integration/test_inspect_table.py | 268 ++++++++++++++++++++++++++++++++
tests/integration/test_writes.py | 28 ----
7 files changed, 582 insertions(+), 31 deletions(-)
diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index c8620af7..15931d02 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -370,7 +370,165 @@ manifest_list:
[["s3://warehouse/default/table_metadata_snapshots/metadata/snap-
summary:
[[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","54
[...]
```
-### Add Files
+### Entries
+
+To show all the table's current manifest entries for both data and delete
files.
+
+```python
+table.inspect.entries()
+```
+
+```
+pyarrow.Table
+status: int8 not null
+snapshot_id: int64 not null
+sequence_number: int64 not null
+file_sequence_number: int64 not null
+data_file: struct<content: int8 not null, file_path: string not null,
file_format: string not null, partition: struct<> not null, record_count: int64
not null, file_size_in_bytes: int64 not null, column_sizes: map<int32, int64>,
value_counts: map<int32, int64>, null_value_counts: map<int32, int64>,
nan_value_counts: map<int32, int64>, lower_bounds: map<int32, binary>,
upper_bounds: map<int32, binary>, key_metadata: binary, split_offsets:
list<item: int64>, equality_ids: list<item: int32> [...]
+ child 0, content: int8 not null
+ child 1, file_path: string not null
+ child 2, file_format: string not null
+ child 3, partition: struct<> not null
+ child 4, record_count: int64 not null
+ child 5, file_size_in_bytes: int64 not null
+ child 6, column_sizes: map<int32, int64>
+ child 0, entries: struct<key: int32 not null, value: int64> not null
+ child 0, key: int32 not null
+ child 1, value: int64
+ child 7, value_counts: map<int32, int64>
+ child 0, entries: struct<key: int32 not null, value: int64> not null
+ child 0, key: int32 not null
+ child 1, value: int64
+ child 8, null_value_counts: map<int32, int64>
+ child 0, entries: struct<key: int32 not null, value: int64> not null
+ child 0, key: int32 not null
+ child 1, value: int64
+ child 9, nan_value_counts: map<int32, int64>
+ child 0, entries: struct<key: int32 not null, value: int64> not null
+ child 0, key: int32 not null
+ child 1, value: int64
+ child 10, lower_bounds: map<int32, binary>
+ child 0, entries: struct<key: int32 not null, value: binary> not null
+ child 0, key: int32 not null
+ child 1, value: binary
+ child 11, upper_bounds: map<int32, binary>
+ child 0, entries: struct<key: int32 not null, value: binary> not null
+ child 0, key: int32 not null
+ child 1, value: binary
+ child 12, key_metadata: binary
+ child 13, split_offsets: list<item: int64>
+ child 0, item: int64
+ child 14, equality_ids: list<item: int32>
+ child 0, item: int32
+ child 15, sort_order_id: int32
+readable_metrics: struct<city: struct<column_size: int64, value_count: int64,
null_value_count: int64, nan_value_count: int64, lower_bound: string,
upper_bound: string> not null, lat: struct<column_size: int64, value_count:
int64, null_value_count: int64, nan_value_count: int64, lower_bound: double,
upper_bound: double> not null, long: struct<column_size: int64, value_count:
int64, null_value_count: int64, nan_value_count: int64, lower_bound: double,
upper_bound: double> not null>
+ child 0, city: struct<column_size: int64, value_count: int64,
null_value_count: int64, nan_value_count: int64, lower_bound: string,
upper_bound: string> not null
+ child 0, column_size: int64
+ child 1, value_count: int64
+ child 2, null_value_count: int64
+ child 3, nan_value_count: int64
+ child 4, lower_bound: string
+ child 5, upper_bound: string
+ child 1, lat: struct<column_size: int64, value_count: int64,
null_value_count: int64, nan_value_count: int64, lower_bound: double,
upper_bound: double> not null
+ child 0, column_size: int64
+ child 1, value_count: int64
+ child 2, null_value_count: int64
+ child 3, nan_value_count: int64
+ child 4, lower_bound: double
+ child 5, upper_bound: double
+ child 2, long: struct<column_size: int64, value_count: int64,
null_value_count: int64, nan_value_count: int64, lower_bound: double,
upper_bound: double> not null
+ child 0, column_size: int64
+ child 1, value_count: int64
+ child 2, null_value_count: int64
+ child 3, nan_value_count: int64
+ child 4, lower_bound: double
+ child 5, upper_bound: double
+----
+status: [[1]]
+snapshot_id: [[6245626162224016531]]
+sequence_number: [[1]]
+file_sequence_number: [[1]]
+data_file: [
+ -- is_valid: all not null
+ -- child 0 type: int8
+[0]
+ -- child 1 type: string
+["s3://warehouse/default/cities/data/00000-0-80766b66-e558-4150-a5cf-85e4c609b9fe.parquet"]
+ -- child 2 type: string
+["PARQUET"]
+ -- child 3 type: struct<>
+ -- is_valid: all not null
+ -- child 4 type: int64
+[4]
+ -- child 5 type: int64
+[1656]
+ -- child 6 type: map<int32, int64>
+[keys:[1,2,3]values:[140,135,135]]
+ -- child 7 type: map<int32, int64>
+[keys:[1,2,3]values:[4,4,4]]
+ -- child 8 type: map<int32, int64>
+[keys:[1,2,3]values:[0,0,0]]
+ -- child 9 type: map<int32, int64>
+[keys:[]values:[]]
+ -- child 10 type: map<int32, binary>
+[keys:[1,2,3]values:[416D7374657264616D,8602B68311E34240,3A77BB5E9A9B5EC0]]
+ -- child 11 type: map<int32, binary>
+[keys:[1,2,3]values:[53616E204672616E636973636F,F5BEF1B5678E4A40,304CA60A46651840]]
+ -- child 12 type: binary
+[null]
+ -- child 13 type: list<item: int64>
+[[4]]
+ -- child 14 type: list<item: int32>
+[null]
+ -- child 15 type: int32
+[null]]
+readable_metrics: [
+ -- is_valid: all not null
+ -- child 0 type: struct<column_size: int64, value_count: int64,
null_value_count: int64, nan_value_count: int64, lower_bound: string,
upper_bound: string>
+ -- is_valid: all not null
+ -- child 0 type: int64
+[140]
+ -- child 1 type: int64
+[4]
+ -- child 2 type: int64
+[0]
+ -- child 3 type: int64
+[null]
+ -- child 4 type: string
+["Amsterdam"]
+ -- child 5 type: string
+["San Francisco"]
+ -- child 1 type: struct<column_size: int64, value_count: int64,
null_value_count: int64, nan_value_count: int64, lower_bound: double,
upper_bound: double>
+ -- is_valid: all not null
+ -- child 0 type: int64
+[135]
+ -- child 1 type: int64
+[4]
+ -- child 2 type: int64
+[0]
+ -- child 3 type: int64
+[null]
+ -- child 4 type: double
+[37.773972]
+ -- child 5 type: double
+[53.11254]
+ -- child 2 type: struct<column_size: int64, value_count: int64,
null_value_count: int64, nan_value_count: int64, lower_bound: double,
upper_bound: double>
+ -- is_valid: all not null
+ -- child 0 type: int64
+[135]
+ -- child 1 type: int64
+[4]
+ -- child 2 type: int64
+[0]
+ -- child 3 type: int64
+[null]
+ -- child 4 type: double
+[-122.431297]
+ -- child 5 type: double
+[6.0989]]
+```
+
+## Add Files
Expert Iceberg users may choose to commit existing parquet files to the
Iceberg table as data files, without rewriting them.
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 0f113f3b..e183d827 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -47,6 +47,7 @@ from sortedcontainers import SortedList
from typing_extensions import Annotated
import pyiceberg.expressions.parser as parser
+from pyiceberg.conversions import from_bytes
from pyiceberg.exceptions import CommitFailedException, ResolveError,
ValidationError
from pyiceberg.expressions import (
AlwaysTrue,
@@ -3264,3 +3265,126 @@ class InspectTable:
snapshots,
schema=snapshots_schema,
)
+
+ def entries(self) -> "pa.Table":
+ import pyarrow as pa
+
+ from pyiceberg.io.pyarrow import schema_to_pyarrow
+
+ schema = self.tbl.metadata.schema()
+
+ readable_metrics_struct = []
+
+ def _readable_metrics_struct(bound_type: PrimitiveType) ->
pa.StructType:
+ pa_bound_type = schema_to_pyarrow(bound_type)
+ return pa.struct([
+ pa.field("column_size", pa.int64(), nullable=True),
+ pa.field("value_count", pa.int64(), nullable=True),
+ pa.field("null_value_count", pa.int64(), nullable=True),
+ pa.field("nan_value_count", pa.int64(), nullable=True),
+ pa.field("lower_bound", pa_bound_type, nullable=True),
+ pa.field("upper_bound", pa_bound_type, nullable=True),
+ ])
+
+ for field in self.tbl.metadata.schema().fields:
+ readable_metrics_struct.append(
+ pa.field(schema.find_column_name(field.field_id),
_readable_metrics_struct(field.field_type), nullable=False)
+ )
+
+ partition_record = self.tbl.metadata.specs_struct()
+ pa_record_struct = schema_to_pyarrow(partition_record)
+
+ entries_schema = pa.schema([
+ pa.field('status', pa.int8(), nullable=False),
+ pa.field('snapshot_id', pa.int64(), nullable=False),
+ pa.field('sequence_number', pa.int64(), nullable=False),
+ pa.field('file_sequence_number', pa.int64(), nullable=False),
+ pa.field(
+ 'data_file',
+ pa.struct([
+ pa.field('content', pa.int8(), nullable=False),
+ pa.field('file_path', pa.string(), nullable=False),
+ pa.field('file_format', pa.string(), nullable=False),
+ pa.field('partition', pa_record_struct, nullable=False),
+ pa.field('record_count', pa.int64(), nullable=False),
+ pa.field('file_size_in_bytes', pa.int64(), nullable=False),
+ pa.field('column_sizes', pa.map_(pa.int32(), pa.int64()),
nullable=True),
+ pa.field('value_counts', pa.map_(pa.int32(), pa.int64()),
nullable=True),
+ pa.field('null_value_counts', pa.map_(pa.int32(),
pa.int64()), nullable=True),
+ pa.field('nan_value_counts', pa.map_(pa.int32(),
pa.int64()), nullable=True),
+ pa.field('lower_bounds', pa.map_(pa.int32(), pa.binary()),
nullable=True),
+ pa.field('upper_bounds', pa.map_(pa.int32(), pa.binary()),
nullable=True),
+ pa.field('key_metadata', pa.binary(), nullable=True),
+ pa.field('split_offsets', pa.list_(pa.int64()),
nullable=True),
+ pa.field('equality_ids', pa.list_(pa.int32()),
nullable=True),
+ pa.field('sort_order_id', pa.int32(), nullable=True),
+ ]),
+ nullable=False,
+ ),
+ pa.field('readable_metrics', pa.struct(readable_metrics_struct),
nullable=True),
+ ])
+
+ entries = []
+ if snapshot := self.tbl.metadata.current_snapshot():
+ for manifest in snapshot.manifests(self.tbl.io):
+ for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
+ column_sizes = entry.data_file.column_sizes or {}
+ value_counts = entry.data_file.value_counts or {}
+ null_value_counts = entry.data_file.null_value_counts or {}
+ nan_value_counts = entry.data_file.nan_value_counts or {}
+ lower_bounds = entry.data_file.lower_bounds or {}
+ upper_bounds = entry.data_file.upper_bounds or {}
+ readable_metrics = {
+ schema.find_column_name(field.field_id): {
+ "column_size": column_sizes.get(field.field_id),
+ "value_count": value_counts.get(field.field_id),
+ "null_value_count":
null_value_counts.get(field.field_id),
+ "nan_value_count":
nan_value_counts.get(field.field_id),
+ # Makes them readable
+ "lower_bound": from_bytes(field.field_type,
lower_bound)
+ if (lower_bound :=
lower_bounds.get(field.field_id))
+ else None,
+ "upper_bound": from_bytes(field.field_type,
upper_bound)
+ if (upper_bound :=
upper_bounds.get(field.field_id))
+ else None,
+ }
+ for field in self.tbl.metadata.schema().fields
+ }
+
+ partition = entry.data_file.partition
+ partition_record_dict = {
+ field.name: partition[pos]
+ for pos, field in
enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
+ }
+
+ entries.append({
+ 'status': entry.status.value,
+ 'snapshot_id': entry.snapshot_id,
+ 'sequence_number': entry.data_sequence_number,
+ 'file_sequence_number': entry.file_sequence_number,
+ 'data_file': {
+ "content": entry.data_file.content,
+ "file_path": entry.data_file.file_path,
+ "file_format": entry.data_file.file_format,
+ "partition": partition_record_dict,
+ "record_count": entry.data_file.record_count,
+ "file_size_in_bytes":
entry.data_file.file_size_in_bytes,
+ "column_sizes": dict(entry.data_file.column_sizes),
+ "value_counts": dict(entry.data_file.value_counts),
+ "null_value_counts":
dict(entry.data_file.null_value_counts),
+ "nan_value_counts":
entry.data_file.nan_value_counts,
+ "lower_bounds": entry.data_file.lower_bounds,
+ "upper_bounds": entry.data_file.upper_bounds,
+ "key_metadata": entry.data_file.key_metadata,
+ "split_offsets": entry.data_file.split_offsets,
+ "equality_ids": entry.data_file.equality_ids,
+ "sort_order_id": entry.data_file.sort_order_id,
+ "spec_id": entry.data_file.spec_id,
+ },
+ 'readable_metrics': readable_metrics,
+ })
+
+ return pa.Table.from_pylist(
+ entries,
+ schema=entries_schema,
+ )
diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py
index 2e20c509..21ed1447 100644
--- a/pyiceberg/table/metadata.py
+++ b/pyiceberg/table/metadata.py
@@ -49,7 +49,7 @@ from pyiceberg.typedef import (
IcebergRootModel,
Properties,
)
-from pyiceberg.types import transform_dict_value_to_str
+from pyiceberg.types import NestedField, StructType,
transform_dict_value_to_str
from pyiceberg.utils.config import Config
from pyiceberg.utils.datetime import datetime_to_millis
@@ -245,6 +245,31 @@ class TableMetadataCommonFields(IcebergBaseModel):
"""Return a dict the partition specs this table."""
return {spec.spec_id: spec for spec in self.partition_specs}
+ def specs_struct(self) -> StructType:
+ """Produce a struct of all the combined PartitionSpecs.
+
+ The partition fields should be optional: Partition fields may be added
later,
+ in which case not all files would have the result field, and it may be
null.
+
+ :return: A StructType that represents all the combined PartitionSpecs
of the table
+ """
+ specs = self.specs()
+
+ # Collect all the fields
+ struct_fields = {field.field_id: field for spec in specs.values() for
field in spec.fields}
+
+ schema = self.schema()
+
+ nested_fields = []
+ # Sort them by field_id in order to get a deterministic output
+ for field_id in sorted(struct_fields):
+ field = struct_fields[field_id]
+ source_type = schema.find_type(field.source_id)
+ result_type = field.transform.result_type(source_type)
+ nested_fields.append(NestedField(field_id=field.field_id,
name=field.name, type=result_type, required=False))
+
+ return StructType(*nested_fields)
+
def new_snapshot_id(self) -> int:
"""Generate a new snapshot-id that's not in use."""
snapshot_id = _generate_snapshot_id()
diff --git a/pyiceberg/utils/lazydict.py b/pyiceberg/utils/lazydict.py
index dfe251c0..ea70c78c 100644
--- a/pyiceberg/utils/lazydict.py
+++ b/pyiceberg/utils/lazydict.py
@@ -66,3 +66,7 @@ class LazyDict(Mapping[K, V]):
"""Return the number of items in the dictionary."""
source = self._dict or self._build_dict()
return len(source)
+
+ def __dict__(self) -> Dict[K, V]: # type: ignore
+ """Convert the lazy dict in a dict."""
+ return self._dict or self._build_dict()
diff --git a/tests/conftest.py b/tests/conftest.py
index 8c381a1c..aa09517b 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -2043,5 +2043,5 @@ def pa_schema() -> "pa.Schema":
def arrow_table_with_null(pa_schema: "pa.Schema") -> "pa.Table":
import pyarrow as pa
- """PyArrow table with all kinds of columns"""
+ """Pyarrow table with all kinds of columns."""
return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)
diff --git a/tests/integration/test_inspect_table.py
b/tests/integration/test_inspect_table.py
new file mode 100644
index 00000000..f2515cae
--- /dev/null
+++ b/tests/integration/test_inspect_table.py
@@ -0,0 +1,268 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+
+import math
+from datetime import date, datetime
+
+import pyarrow as pa
+import pytest
+import pytz
+from pyspark.sql import SparkSession
+
+from pyiceberg.catalog import Catalog
+from pyiceberg.exceptions import NoSuchTableError
+from pyiceberg.schema import Schema
+from pyiceberg.table import Table
+from pyiceberg.typedef import Properties
+from pyiceberg.types import (
+ BinaryType,
+ BooleanType,
+ DateType,
+ DoubleType,
+ FixedType,
+ FloatType,
+ IntegerType,
+ LongType,
+ NestedField,
+ StringType,
+ TimestampType,
+ TimestamptzType,
+)
+
+TABLE_SCHEMA = Schema(
+ NestedField(field_id=1, name="bool", field_type=BooleanType(),
required=False),
+ NestedField(field_id=2, name="string", field_type=StringType(),
required=False),
+ NestedField(field_id=3, name="string_long", field_type=StringType(),
required=False),
+ NestedField(field_id=4, name="int", field_type=IntegerType(),
required=False),
+ NestedField(field_id=5, name="long", field_type=LongType(),
required=False),
+ NestedField(field_id=6, name="float", field_type=FloatType(),
required=False),
+ NestedField(field_id=7, name="double", field_type=DoubleType(),
required=False),
+ NestedField(field_id=8, name="timestamp", field_type=TimestampType(),
required=False),
+ NestedField(field_id=9, name="timestamptz", field_type=TimestamptzType(),
required=False),
+ NestedField(field_id=10, name="date", field_type=DateType(),
required=False),
+ # NestedField(field_id=11, name="time", field_type=TimeType(),
required=False),
+ # NestedField(field_id=12, name="uuid", field_type=UuidType(),
required=False),
+ NestedField(field_id=12, name="binary", field_type=BinaryType(),
required=False),
+ NestedField(field_id=13, name="fixed", field_type=FixedType(16),
required=False),
+)
+
+
+def _create_table(session_catalog: Catalog, identifier: str, properties:
Properties) -> Table:
+ try:
+ session_catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ return session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties=properties)
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_inspect_snapshots(
+ spark: SparkSession, session_catalog: Catalog, arrow_table_with_null:
pa.Table, format_version: int
+) -> None:
+ identifier = "default.table_metadata_snapshots"
+ tbl = _create_table(session_catalog, identifier,
properties={"format-version": format_version})
+
+ tbl.overwrite(arrow_table_with_null)
+ # should produce a DELETE entry
+ tbl.overwrite(arrow_table_with_null)
+ # Since we don't rewrite, this should produce a new manifest with an ADDED
entry
+ tbl.append(arrow_table_with_null)
+
+ df = tbl.inspect.snapshots()
+
+ assert df.column_names == [
+ 'committed_at',
+ 'snapshot_id',
+ 'parent_id',
+ 'operation',
+ 'manifest_list',
+ 'summary',
+ ]
+
+ for committed_at in df['committed_at']:
+ assert isinstance(committed_at.as_py(), datetime)
+
+ for snapshot_id in df['snapshot_id']:
+ assert isinstance(snapshot_id.as_py(), int)
+
+ assert df['parent_id'][0].as_py() is None
+ assert df['parent_id'][1:] == df['snapshot_id'][:2]
+
+ assert [operation.as_py() for operation in df['operation']] == ['append',
'overwrite', 'append']
+
+ for manifest_list in df['manifest_list']:
+ assert manifest_list.as_py().startswith("s3://")
+
+ assert df['summary'][0].as_py() == [
+ ('added-files-size', '5459'),
+ ('added-data-files', '1'),
+ ('added-records', '3'),
+ ('total-data-files', '1'),
+ ('total-delete-files', '0'),
+ ('total-records', '3'),
+ ('total-files-size', '5459'),
+ ('total-position-deletes', '0'),
+ ('total-equality-deletes', '0'),
+ ]
+
+ lhs = spark.table(f"{identifier}.snapshots").toPandas()
+ rhs = df.to_pandas()
+ for column in df.column_names:
+ for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
+ if column == 'summary':
+ # Arrow returns a list of tuples, instead of a dict
+ right = dict(right)
+
+ if isinstance(left, float) and math.isnan(left) and
isinstance(right, float) and math.isnan(right):
+ # NaN != NaN in Python
+ continue
+
+ assert left == right, f"Difference in column {column}: {left} !=
{right}"
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_inspect_entries(
+ spark: SparkSession, session_catalog: Catalog, arrow_table_with_null:
pa.Table, format_version: int
+) -> None:
+ identifier = "default.table_metadata_entries"
+ tbl = _create_table(session_catalog, identifier,
properties={"format-version": format_version})
+
+ # Write some data
+ tbl.append(arrow_table_with_null)
+
+ df = tbl.inspect.entries()
+
+ assert df.column_names == [
+ 'status',
+ 'snapshot_id',
+ 'sequence_number',
+ 'file_sequence_number',
+ 'data_file',
+ 'readable_metrics',
+ ]
+
+ # Make sure that they are filled properly
+ for int_column in ['status', 'snapshot_id', 'sequence_number',
'file_sequence_number']:
+ for value in df[int_column]:
+ assert isinstance(value.as_py(), int)
+
+ for snapshot_id in df['snapshot_id']:
+ assert isinstance(snapshot_id.as_py(), int)
+
+ lhs = df.to_pandas()
+ rhs = spark.table(f"{identifier}.entries").toPandas()
+ for column in df.column_names:
+ for left, right in zip(lhs[column].to_list(), rhs[column].to_list()):
+ if column == 'data_file':
+ right = right.asDict(recursive=True)
+ for df_column in left.keys():
+ if df_column == 'partition':
+ # Spark leaves out the partition if the table is
unpartitioned
+ continue
+
+ df_lhs = left[df_column]
+ df_rhs = right[df_column]
+ if isinstance(df_rhs, dict):
+ # Arrow turns dicts into lists of tuple
+ df_lhs = dict(df_lhs)
+
+ assert df_lhs == df_rhs, f"Difference in data_file column
{df_column}: {df_lhs} != {df_rhs}"
+ elif column == 'readable_metrics':
+ right = right.asDict(recursive=True)
+
+ assert list(left.keys()) == [
+ 'bool',
+ 'string',
+ 'string_long',
+ 'int',
+ 'long',
+ 'float',
+ 'double',
+ 'timestamp',
+ 'timestamptz',
+ 'date',
+ 'binary',
+ 'fixed',
+ ]
+
+ assert left.keys() == right.keys()
+
+ for rm_column in left.keys():
+ rm_lhs = left[rm_column]
+ rm_rhs = right[rm_column]
+
+ assert rm_lhs['column_size'] == rm_rhs['column_size']
+ assert rm_lhs['value_count'] == rm_rhs['value_count']
+ assert rm_lhs['null_value_count'] ==
rm_rhs['null_value_count']
+ assert rm_lhs['nan_value_count'] ==
rm_rhs['nan_value_count']
+
+ if rm_column == 'timestamptz':
+ # PySpark does not correctly set the timstamptz
+ rm_rhs['lower_bound'] =
rm_rhs['lower_bound'].replace(tzinfo=pytz.utc)
+ rm_rhs['upper_bound'] =
rm_rhs['upper_bound'].replace(tzinfo=pytz.utc)
+
+ assert rm_lhs['lower_bound'] == rm_rhs['lower_bound']
+ assert rm_lhs['upper_bound'] == rm_rhs['upper_bound']
+ else:
+ assert left == right, f"Difference in column {column}: {left}
!= {right}"
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_inspect_entries_partitioned(spark: SparkSession, session_catalog:
Catalog, format_version: int) -> None:
+ identifier = "default.table_metadata_entries_partitioned"
+ try:
+ session_catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ spark.sql(
+ f"""
+ CREATE TABLE {identifier} (
+ dt date
+ )
+ PARTITIONED BY (months(dt))
+ """
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO {identifier} VALUES (CAST('2021-01-01' AS date))
+ """
+ )
+
+ spark.sql(
+ f"""
+ ALTER TABLE {identifier}
+ REPLACE PARTITION FIELD dt_month WITH days(dt)
+ """
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO {identifier} VALUES (CAST('2021-02-01' AS date))
+ """
+ )
+
+ df = session_catalog.load_table(identifier).inspect.entries()
+
+ assert df.to_pydict()['data_file'][0]['partition'] == {'dt_day':
date(2021, 2, 1), 'dt_month': None}
+ assert df.to_pydict()['data_file'][1]['partition'] == {'dt_day': None,
'dt_month': 612}
diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py
index 77567023..e950fb43 100644
--- a/tests/integration/test_writes.py
+++ b/tests/integration/test_writes.py
@@ -96,34 +96,6 @@ TABLE_SCHEMA = Schema(
)
[email protected](scope="session")
-def pa_schema() -> pa.Schema:
- return pa.schema([
- ("bool", pa.bool_()),
- ("string", pa.string()),
- ("string_long", pa.string()),
- ("int", pa.int32()),
- ("long", pa.int64()),
- ("float", pa.float32()),
- ("double", pa.float64()),
- ("timestamp", pa.timestamp(unit="us")),
- ("timestamptz", pa.timestamp(unit="us", tz="UTC")),
- ("date", pa.date32()),
- # Not supported by Spark
- # ("time", pa.time64("us")),
- # Not natively supported by Arrow
- # ("uuid", pa.fixed(16)),
- ("binary", pa.large_binary()),
- ("fixed", pa.binary(16)),
- ])
-
-
[email protected](scope="session")
-def arrow_table_with_null(pa_schema: pa.Schema) -> pa.Table:
- """PyArrow table with all kinds of columns"""
- return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)
-
-
@pytest.fixture(scope="session")
def arrow_table_without_data(pa_schema: pa.Schema) -> pa.Table:
"""PyArrow table with all kinds of columns"""