This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 8062aef8b6 Python: ManifestWriter and ManifestListWriter (#8622)
8062aef8b6 is described below
commit 8062aef8b68ef18647ea10e01ff6e82a89582fe9
Author: HonahX <[email protected]>
AuthorDate: Fri Sep 29 02:00:29 2023 -0700
Python: ManifestWriter and ManifestListWriter (#8622)
* add ManifestWriter and ManifestListWriter
* fix lint issue
* remove assert, fix format issue
* add prepare to ManifestWriter, remove TODO, fix format issue
* fix some nit issue, add prepare... to ensure the correctness of data
written
* fix format issue
* fix lint issue
* avoid creating too much objects, handling v1, v2 data_file_type and
DataFile class properly.
* modify tests
* refactor the way of handling two version of DataFile record
* add integration tests, fix bugs, change PartitionSummary to a function
* fix format issue
* make data_type_v2 constants
---
python/pyiceberg/avro/writer.py | 4 +-
python/pyiceberg/manifest.py | 468 +++++++++++++++++++++++++++++-
python/pyiceberg/typedef.py | 1 +
python/tests/avro/test_file.py | 1 +
python/tests/test_integration_manifest.py | 126 ++++++++
python/tests/utils/test_manifest.py | 264 +++++++++++++++++
6 files changed, 855 insertions(+), 9 deletions(-)
diff --git a/python/pyiceberg/avro/writer.py b/python/pyiceberg/avro/writer.py
index fba17e8971..ad6a755614 100644
--- a/python/pyiceberg/avro/writer.py
+++ b/python/pyiceberg/avro/writer.py
@@ -34,7 +34,7 @@ from typing import (
from uuid import UUID
from pyiceberg.avro.encoder import BinaryEncoder
-from pyiceberg.types import StructType
+from pyiceberg.typedef import Record
from pyiceberg.utils.decimal import decimal_required_bytes, decimal_to_bytes
from pyiceberg.utils.singleton import Singleton
@@ -160,7 +160,7 @@ class OptionWriter(Writer):
class StructWriter(Writer):
field_writers: Tuple[Writer, ...] = dataclassfield()
- def write(self, encoder: BinaryEncoder, val: StructType) -> None:
+ def write(self, encoder: BinaryEncoder, val: Record) -> None:
for writer, value in zip(self.field_writers, val.record_fields()):
writer.write(encoder, value)
diff --git a/python/pyiceberg/manifest.py b/python/pyiceberg/manifest.py
index 57ec11db77..8bdbfd3524 100644
--- a/python/pyiceberg/manifest.py
+++ b/python/pyiceberg/manifest.py
@@ -14,31 +14,51 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from __future__ import annotations
+
+import math
+from abc import ABC, abstractmethod
from enum import Enum
+from functools import singledispatch
+from types import TracebackType
from typing import (
Any,
Dict,
Iterator,
List,
+ Literal,
Optional,
+ Type,
)
-from pyiceberg.avro.file import AvroFile
-from pyiceberg.io import FileIO, InputFile
+from pyiceberg.avro.file import AvroFile, AvroOutputFile
+from pyiceberg.conversions import to_bytes
+from pyiceberg.exceptions import ValidationError
+from pyiceberg.io import FileIO, InputFile, OutputFile
+from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.typedef import Record
from pyiceberg.types import (
BinaryType,
BooleanType,
+ DateType,
+ IcebergType,
IntegerType,
ListType,
LongType,
MapType,
NestedField,
+ PrimitiveType,
StringType,
StructType,
+ TimestampType,
+ TimestamptzType,
+ TimeType,
)
+UNASSIGNED_SEQ = -1
+DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
+
class DataFileContent(int, Enum):
DATA = 0
@@ -79,7 +99,7 @@ class FileFormat(str, Enum):
return f"FileFormat.{self.name}"
-DATA_FILE_TYPE = StructType(
+DATA_FILE_TYPE_V1 = StructType(
NestedField(
field_id=134,
name="content",
@@ -90,7 +110,11 @@ DATA_FILE_TYPE = StructType(
),
NestedField(field_id=100, name="file_path", field_type=StringType(),
required=True, doc="Location URI with FS scheme"),
NestedField(
- field_id=101, name="file_format", field_type=StringType(),
required=True, doc="File format name: avro, orc, or parquet"
+ field_id=101,
+ name="file_format",
+ field_type=StringType(),
+ required=True,
+ doc="File format name: avro, orc, or parquet",
),
NestedField(
field_id=102,
@@ -101,6 +125,13 @@ DATA_FILE_TYPE = StructType(
),
NestedField(field_id=103, name="record_count", field_type=LongType(),
required=True, doc="Number of records in the file"),
NestedField(field_id=104, name="file_size_in_bytes",
field_type=LongType(), required=True, doc="Total file size in bytes"),
+ NestedField(
+ field_id=105,
+ name="block_size_in_bytes",
+ field_type=LongType(),
+ required=False,
+ doc="Deprecated. Always write a default in v1. Do not write in v2.",
+ ),
NestedField(
field_id=108,
name="column_sizes",
@@ -162,6 +193,55 @@ DATA_FILE_TYPE = StructType(
NestedField(field_id=141, name="spec_id", field_type=IntegerType(),
required=False, doc="Partition spec ID"),
)
+DATA_FILE_TYPE_V2 = StructType(*[field for field in DATA_FILE_TYPE_V1.fields
if field.field_id != 105])
+
+
+@singledispatch
+def partition_field_to_data_file_partition_field(partition_field_type:
IcebergType) -> PrimitiveType:
+ raise TypeError(f"Unsupported partition field type:
{partition_field_type}")
+
+
+@partition_field_to_data_file_partition_field.register(LongType)
+@partition_field_to_data_file_partition_field.register(DateType)
+@partition_field_to_data_file_partition_field.register(TimeType)
+@partition_field_to_data_file_partition_field.register(TimestampType)
+@partition_field_to_data_file_partition_field.register(TimestamptzType)
+def _(partition_field_type: PrimitiveType) -> IntegerType:
+ return IntegerType()
+
+
+@partition_field_to_data_file_partition_field.register(PrimitiveType)
+def _(partition_field_type: PrimitiveType) -> PrimitiveType:
+ return partition_field_type
+
+
+def data_file_with_partition(partition_type: StructType, format_version:
Literal[1, 2]) -> StructType:
+ data_file_partition_type = StructType(
+ *[
+ NestedField(
+ field_id=field.field_id,
+ name=field.name,
+
field_type=partition_field_to_data_file_partition_field(field.field_type),
+ )
+ for field in partition_type.fields
+ ]
+ )
+
+ return StructType(
+ *[
+ NestedField(
+ field_id=102,
+ name="partition",
+ field_type=data_file_partition_type,
+ required=True,
+ doc="Partition data tuple, schema based on the partition spec",
+ )
+ if field.field_id == 102
+ else field
+ for field in (DATA_FILE_TYPE_V1.fields if format_version == 1 else
DATA_FILE_TYPE_V2.fields)
+ ]
+ )
+
class DataFile(Record):
__slots__ = (
@@ -171,6 +251,7 @@ class DataFile(Record):
"partition",
"record_count",
"file_size_in_bytes",
+ "block_size_in_bytes",
"column_sizes",
"value_counts",
"null_value_counts",
@@ -189,6 +270,7 @@ class DataFile(Record):
partition: Record
record_count: int
file_size_in_bytes: int
+ block_size_in_bytes: Optional[int]
column_sizes: Dict[int, int]
value_counts: Dict[int, int]
null_value_counts: Dict[int, int]
@@ -208,8 +290,11 @@ class DataFile(Record):
value = FileFormat[value]
super().__setattr__(name, value)
- def __init__(self, *data: Any, **named_data: Any) -> None:
- super().__init__(*data, **{"struct": DATA_FILE_TYPE, **named_data})
+ def __init__(self, format_version: Literal[1, 2] = 1, *data: Any,
**named_data: Any) -> None:
+ super().__init__(
+ *data,
+ **{"struct": DATA_FILE_TYPE_V1 if format_version == 1 else
DATA_FILE_TYPE_V2, **named_data},
+ )
def __hash__(self) -> int:
"""Return the hash of the file path."""
@@ -228,12 +313,21 @@ MANIFEST_ENTRY_SCHEMA = Schema(
NestedField(1, "snapshot_id", LongType(), required=False),
NestedField(3, "data_sequence_number", LongType(), required=False),
NestedField(4, "file_sequence_number", LongType(), required=False),
- NestedField(2, "data_file", DATA_FILE_TYPE, required=True),
+ NestedField(2, "data_file", DATA_FILE_TYPE_V1, required=True),
)
MANIFEST_ENTRY_SCHEMA_STRUCT = MANIFEST_ENTRY_SCHEMA.as_struct()
+def manifest_entry_schema_with_data_file(data_file: StructType) -> Schema:
+ return Schema(
+ *[
+ NestedField(2, "data_file", data_file, required=True) if
field.field_id == 2 else field
+ for field in MANIFEST_ENTRY_SCHEMA.fields
+ ]
+ )
+
+
class ManifestEntry(Record):
__slots__ = ("status", "snapshot_id", "data_sequence_number",
"file_sequence_number", "data_file")
status: ManifestEntryStatus
@@ -265,6 +359,54 @@ class PartitionFieldSummary(Record):
super().__init__(*data, **{"struct": PARTITION_FIELD_SUMMARY_TYPE,
**named_data})
+class PartitionFieldStats:
+ _type: PrimitiveType
+ _contains_null: bool
+ _contains_nan: bool
+ _min: Optional[Any]
+ _max: Optional[Any]
+
+ def __init__(self, iceberg_type: PrimitiveType) -> None:
+ self._type = iceberg_type
+ self._contains_null = False
+ self._contains_nan = False
+ self._min = None
+ self._max = None
+
+ def to_summary(self) -> PartitionFieldSummary:
+ return PartitionFieldSummary(
+ contains_null=self._contains_null,
+ contains_nan=self._contains_nan,
+ lower_bound=to_bytes(self._type, self._min) if self._min is not
None else None,
+ upper_bound=to_bytes(self._type, self._max) if self._max is not
None else None,
+ )
+
+ def update(self, value: Any) -> None:
+ if value is None:
+ self._contains_null = True
+ elif isinstance(value, float) and math.isnan(value):
+ self._contains_nan = True
+ else:
+ if self._min is None:
+ self._min = value
+ self._max = value
+ else:
+ self._max = max(self._max, value)
+ self._min = min(self._min, value)
+
+
+def construct_partition_summaries(spec: PartitionSpec, schema: Schema,
partitions: List[Record]) -> List[PartitionFieldSummary]:
+ types = [field.field_type for field in spec.partition_type(schema).fields]
+ field_stats = [PartitionFieldStats(field_type) for field_type in types]
+ for partition_keys in partitions:
+ for i, field_type in enumerate(types):
+ if not isinstance(field_type, PrimitiveType):
+ raise ValueError(f"Expected a primitive type for the partition
field, got {field_type}")
+ partition_key = partition_keys[i]
+ field_stats[i].update(partition_key)
+ return [field.to_summary() for field in field_stats]
+
+
MANIFEST_FILE_SCHEMA: Schema = Schema(
NestedField(500, "manifest_path", StringType(), required=True,
doc="Location URI with FS scheme"),
NestedField(501, "manifest_length", LongType(), required=True),
@@ -405,3 +547,315 @@ def _inherit_sequence_number(entry: ManifestEntry,
manifest: ManifestFile) -> Ma
entry.file_sequence_number = manifest.sequence_number
return entry
+
+
+class ManifestWriter(ABC):
+ closed: bool
+ _spec: PartitionSpec
+ _schema: Schema
+ _output_file: OutputFile
+ _writer: AvroOutputFile[ManifestEntry]
+ _snapshot_id: int
+ _meta: Dict[str, str]
+ _added_files: int
+ _added_rows: int
+ _existing_files: int
+ _existing_rows: int
+ _deleted_files: int
+ _deleted_rows: int
+ _min_data_sequence_number: Optional[int]
+ _partitions: List[Record]
+
+ def __init__(self, spec: PartitionSpec, schema: Schema, output_file:
OutputFile, snapshot_id: int, meta: Dict[str, str]):
+ self.closed = False
+ self._spec = spec
+ self._schema = schema
+ self._output_file = output_file
+ self._snapshot_id = snapshot_id
+ self._meta = meta
+
+ self._added_files = 0
+ self._added_rows = 0
+ self._existing_files = 0
+ self._existing_rows = 0
+ self._deleted_files = 0
+ self._deleted_rows = 0
+ self._min_data_sequence_number = None
+ self._partitions = []
+
+ def __enter__(self) -> ManifestWriter:
+ """Open the writer."""
+ self._writer = self.new_writer()
+ self._writer.__enter__()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_value: Optional[BaseException],
+ traceback: Optional[TracebackType],
+ ) -> None:
+ """Close the writer."""
+ self.closed = True
+ self._writer.__exit__(exc_type, exc_value, traceback)
+
+ @abstractmethod
+ def content(self) -> ManifestContent:
+ ...
+
+ @abstractmethod
+ def new_writer(self) -> AvroOutputFile[ManifestEntry]:
+ ...
+
+ @abstractmethod
+ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
+ ...
+
+ def to_manifest_file(self) -> ManifestFile:
+ """Return the manifest file."""
+ # once the manifest file is generated, no more entries can be added
+ self.closed = True
+ min_sequence_number = self._min_data_sequence_number or UNASSIGNED_SEQ
+ return ManifestFile(
+ manifest_path=self._output_file.location,
+ manifest_length=len(self._writer.output_file),
+ partition_spec_id=self._spec.spec_id,
+ content=self.content(),
+ sequence_number=UNASSIGNED_SEQ,
+ min_sequence_number=min_sequence_number,
+ added_snapshot_id=self._snapshot_id,
+ added_files_count=self._added_files,
+ existing_files_count=self._existing_files,
+ deleted_files_count=self._deleted_files,
+ added_rows_count=self._added_rows,
+ existing_rows_count=self._existing_rows,
+ deleted_rows_count=self._deleted_rows,
+ partitions=construct_partition_summaries(self._spec, self._schema,
self._partitions),
+ key_metadatas=None,
+ )
+
+ def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
+ if self.closed:
+ raise RuntimeError("Cannot add entry to closed manifest writer")
+ if entry.status == ManifestEntryStatus.ADDED:
+ self._added_files += 1
+ self._added_rows += entry.data_file.record_count
+ elif entry.status == ManifestEntryStatus.EXISTING:
+ self._existing_files += 1
+ self._existing_rows += entry.data_file.record_count
+ elif entry.status == ManifestEntryStatus.DELETED:
+ self._deleted_files += 1
+ self._deleted_rows += entry.data_file.record_count
+
+ self._partitions.append(entry.data_file.partition)
+
+ if (
+ (entry.status == ManifestEntryStatus.ADDED or entry.status ==
ManifestEntryStatus.EXISTING)
+ and entry.data_sequence_number is not None
+ and (self._min_data_sequence_number is None or
entry.data_sequence_number < self._min_data_sequence_number)
+ ):
+ self._min_data_sequence_number = entry.data_sequence_number
+
+ self._writer.write_block([self.prepare_entry(entry)])
+ return self
+
+
+class ManifestWriterV1(ManifestWriter):
+ def __init__(self, spec: PartitionSpec, schema: Schema, output_file:
OutputFile, snapshot_id: int):
+ super().__init__(
+ spec,
+ schema,
+ output_file,
+ snapshot_id,
+ {
+ "schema": schema.json(),
+ "partition-spec": spec.json(),
+ "partition-spec-id": str(spec.spec_id),
+ "format-version": "1",
+ },
+ )
+
+ def content(self) -> ManifestContent:
+ return ManifestContent.DATA
+
+ def new_writer(self) -> AvroOutputFile[ManifestEntry]:
+ v1_data_file_type =
data_file_with_partition(self._spec.partition_type(self._schema),
format_version=1)
+ v1_manifest_entry_schema =
manifest_entry_schema_with_data_file(v1_data_file_type)
+ return AvroOutputFile[ManifestEntry](self._output_file,
v1_manifest_entry_schema, "manifest_entry", self._meta)
+
+ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
+ wrapped_entry = ManifestEntry(*entry.record_fields())
+ wrapped_entry.data_file.block_size_in_bytes = DEFAULT_BLOCK_SIZE
+ return wrapped_entry
+
+
+class ManifestWriterV2(ManifestWriter):
+ def __init__(self, spec: PartitionSpec, schema: Schema, output_file:
OutputFile, snapshot_id: int):
+ super().__init__(
+ spec,
+ schema,
+ output_file,
+ snapshot_id,
+ {
+ "schema": schema.json(),
+ "partition-spec": spec.json(),
+ "partition-spec-id": str(spec.spec_id),
+ "format-version": "2",
+ "content": "data",
+ },
+ )
+
+ def content(self) -> ManifestContent:
+ return ManifestContent.DATA
+
+ def new_writer(self) -> AvroOutputFile[ManifestEntry]:
+ v2_data_file_type =
data_file_with_partition(self._spec.partition_type(self._schema),
format_version=2)
+ v2_manifest_entry_schema =
manifest_entry_schema_with_data_file(v2_data_file_type)
+ return AvroOutputFile[ManifestEntry](self._output_file,
v2_manifest_entry_schema, "manifest_entry", self._meta)
+
+ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
+ if entry.data_sequence_number is None:
+ if entry.snapshot_id is not None and entry.snapshot_id !=
self._snapshot_id:
+ raise ValueError(f"Found unassigned sequence number for an
entry from snapshot: {entry.snapshot_id}")
+ if entry.status != ManifestEntryStatus.ADDED:
+ raise ValueError("Only entries with status ADDED can have null
sequence number")
+ # In v2, we should not write block_size_in_bytes field
+ wrapped_data_file_v2_debug = DataFile(
+ format_version=2,
+ content=entry.data_file.content,
+ file_path=entry.data_file.file_path,
+ file_format=entry.data_file.file_format,
+ partition=entry.data_file.partition,
+ record_count=entry.data_file.record_count,
+ file_size_in_bytes=entry.data_file.file_size_in_bytes,
+ column_sizes=entry.data_file.column_sizes,
+ value_counts=entry.data_file.value_counts,
+ null_value_counts=entry.data_file.null_value_counts,
+ nan_value_counts=entry.data_file.nan_value_counts,
+ lower_bounds=entry.data_file.lower_bounds,
+ upper_bounds=entry.data_file.upper_bounds,
+ key_metadata=entry.data_file.key_metadata,
+ split_offsets=entry.data_file.split_offsets,
+ equality_ids=entry.data_file.equality_ids,
+ sort_order_id=entry.data_file.sort_order_id,
+ spec_id=entry.data_file.spec_id,
+ )
+ wrapped_entry = ManifestEntry(
+ status=entry.status,
+ snapshot_id=entry.snapshot_id,
+ data_sequence_number=entry.data_sequence_number,
+ file_sequence_number=entry.file_sequence_number,
+ data_file=wrapped_data_file_v2_debug,
+ )
+ return wrapped_entry
+
+
+def write_manifest(
+ format_version: Literal[1, 2], spec: PartitionSpec, schema: Schema,
output_file: OutputFile, snapshot_id: int
+) -> ManifestWriter:
+ if format_version == 1:
+ return ManifestWriterV1(spec, schema, output_file, snapshot_id)
+ elif format_version == 2:
+ return ManifestWriterV2(spec, schema, output_file, snapshot_id)
+ else:
+ raise ValueError(f"Cannot write manifest for table version:
{format_version}")
+
+
+class ManifestListWriter(ABC):
+ _output_file: OutputFile
+ _meta: Dict[str, str]
+ _manifest_files: List[ManifestFile]
+ _commit_snapshot_id: int
+ _writer: AvroOutputFile[ManifestFile]
+
+ def __init__(self, output_file: OutputFile, meta: Dict[str, str]):
+ self._output_file = output_file
+ self._meta = meta
+ self._manifest_files = []
+
+ def __enter__(self) -> ManifestListWriter:
+ """Open the writer for writing."""
+ self._writer = AvroOutputFile[ManifestFile](self._output_file,
MANIFEST_FILE_SCHEMA, "manifest_file", self._meta)
+ self._writer.__enter__()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_value: Optional[BaseException],
+ traceback: Optional[TracebackType],
+ ) -> None:
+ """Close the writer."""
+ self._writer.__exit__(exc_type, exc_value, traceback)
+ return
+
+ @abstractmethod
+ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
+ ...
+
+ def add_manifests(self, manifest_files: List[ManifestFile]) ->
ManifestListWriter:
+ self._writer.write_block([self.prepare_manifest(manifest_file) for
manifest_file in manifest_files])
+ return self
+
+
+class ManifestListWriterV1(ManifestListWriter):
+ def __init__(self, output_file: OutputFile, snapshot_id: int,
parent_snapshot_id: int):
+ super().__init__(
+ output_file, {"snapshot-id": str(snapshot_id),
"parent-snapshot-id": str(parent_snapshot_id), "format-version": "1"}
+ )
+
+ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
+ if manifest_file.content != ManifestContent.DATA:
+ raise ValidationError("Cannot store delete manifests in a v1
table")
+ return manifest_file
+
+
+class ManifestListWriterV2(ManifestListWriter):
+ _commit_snapshot_id: int
+ _sequence_number: int
+
+ def __init__(self, output_file: OutputFile, snapshot_id: int,
parent_snapshot_id: int, sequence_number: int):
+ super().__init__(
+ output_file,
+ {
+ "snapshot-id": str(snapshot_id),
+ "parent-snapshot-id": str(parent_snapshot_id),
+ "sequence-number": str(sequence_number),
+ "format-version": "2",
+ },
+ )
+ self._commit_snapshot_id = snapshot_id
+ self._sequence_number = sequence_number
+
+ def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
+ wrapped_manifest_file = ManifestFile(*manifest_file.record_fields())
+
+ if wrapped_manifest_file.sequence_number == UNASSIGNED_SEQ:
+ # if the sequence number is being assigned here, then the manifest
must be created by the current operation.
+ # To validate this, check that the snapshot id matches the current
commit
+ if self._commit_snapshot_id !=
wrapped_manifest_file.added_snapshot_id:
+ raise ValueError(
+ f"Found unassigned sequence number for a manifest from
snapshot: {wrapped_manifest_file.added_snapshot_id}"
+ )
+ wrapped_manifest_file.sequence_number = self._sequence_number
+
+ if wrapped_manifest_file.min_sequence_number == UNASSIGNED_SEQ:
+ if self._commit_snapshot_id !=
wrapped_manifest_file.added_snapshot_id:
+ raise ValueError(
+ f"Found unassigned sequence number for a manifest from
snapshot: {wrapped_manifest_file.added_snapshot_id}"
+ )
+ # if the min sequence number is not determined, then there was no
assigned sequence number for any file
+ # written to the wrapped manifest. Replace the unassigned sequence
number with the one for this commit
+ wrapped_manifest_file.min_sequence_number = self._sequence_number
+ return wrapped_manifest_file
+
+
+def write_manifest_list(
+ format_version: Literal[1, 2], output_file: OutputFile, snapshot_id: int,
parent_snapshot_id: int, sequence_number: int
+) -> ManifestListWriter:
+ if format_version == 1:
+ return ManifestListWriterV1(output_file, snapshot_id,
parent_snapshot_id)
+ elif format_version == 2:
+ return ManifestListWriterV2(output_file, snapshot_id,
parent_snapshot_id, sequence_number)
+ else:
+ raise ValueError(f"Cannot write manifest list for table version:
{format_version}")
diff --git a/python/pyiceberg/typedef.py b/python/pyiceberg/typedef.py
index bb9a438a03..ff2a6d1cb0 100644
--- a/python/pyiceberg/typedef.py
+++ b/python/pyiceberg/typedef.py
@@ -195,4 +195,5 @@ class Record(StructProtocol):
return f"{self.__class__.__name__}[{', '.join(f'{key}={repr(value)}'
for key, value in self.__dict__.items() if not key.startswith('_'))}]"
def record_fields(self) -> List[str]:
+ """Return values of all the fields of the Record class except those
specified in skip_fields."""
return [self.__getattribute__(v) if hasattr(self, v) else None for v
in self._position_to_field_name]
diff --git a/python/tests/avro/test_file.py b/python/tests/avro/test_file.py
index 2738770492..e9dcc7eca1 100644
--- a/python/tests/avro/test_file.py
+++ b/python/tests/avro/test_file.py
@@ -124,6 +124,7 @@ def
test_write_manifest_entry_with_iceberg_read_with_fastavro() -> None:
partition=Record(),
record_count=131327,
file_size_in_bytes=220669226,
+ block_size_in_bytes=67108864,
column_sizes={1: 220661854},
value_counts={1: 131327},
null_value_counts={1: 0},
diff --git a/python/tests/test_integration_manifest.py
b/python/tests/test_integration_manifest.py
new file mode 100644
index 0000000000..34b20f271d
--- /dev/null
+++ b/python/tests/test_integration_manifest.py
@@ -0,0 +1,126 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+
+import inspect
+from enum import Enum
+from tempfile import TemporaryDirectory
+from typing import Any
+
+import pytest
+from fastavro import reader
+
+from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.io.pyarrow import PyArrowFileIO
+from pyiceberg.manifest import (
+ DataFile,
+ ManifestEntry,
+ write_manifest,
+)
+from pyiceberg.table import Table
+from pyiceberg.utils.lazydict import LazyDict
+
+
+# helper function to serialize our objects to dicts to enable
+# direct comparison with the dicts returned by fastavro
+def todict(obj: Any) -> Any:
+ if isinstance(obj, dict) or isinstance(obj, LazyDict):
+ data = []
+ for k, v in obj.items():
+ data.append({"key": k, "value": v})
+ return data
+ elif isinstance(obj, Enum):
+ return obj.value
+ elif hasattr(obj, "__iter__") and not isinstance(obj, str) and not
isinstance(obj, bytes):
+ return [todict(v) for v in obj]
+ elif hasattr(obj, "__dict__"):
+ return {key: todict(value) for key, value in inspect.getmembers(obj)
if not callable(value) and not key.startswith("_")}
+ else:
+ return obj
+
+
[email protected]()
+def catalog() -> Catalog:
+ return load_catalog(
+ "local",
+ **{
+ "type": "rest",
+ "uri": "http://localhost:8181",
+ "s3.endpoint": "http://localhost:9000",
+ "s3.access-key-id": "admin",
+ "s3.secret-access-key": "password",
+ },
+ )
+
+
[email protected]()
+def table_test_all_types(catalog: Catalog) -> Table:
+ return catalog.load_table("default.test_all_types")
+
+
[email protected]
+def test_write_sample_manifest(table_test_all_types: Table) -> None:
+ test_snapshot = table_test_all_types.current_snapshot()
+ if test_snapshot is None:
+ raise ValueError("Table has no current snapshot, check the docker
environment")
+ io = table_test_all_types.io
+ test_manifest_file = test_snapshot.manifests(io)[0]
+ test_manifest_entries = test_manifest_file.fetch_manifest_entry(io)
+ entry = test_manifest_entries[0]
+ test_schema = table_test_all_types.schema()
+ test_spec = table_test_all_types.spec()
+ wrapped_data_file_v2_debug = DataFile(
+ format_version=2,
+ content=entry.data_file.content,
+ file_path=entry.data_file.file_path,
+ file_format=entry.data_file.file_format,
+ partition=entry.data_file.partition,
+ record_count=entry.data_file.record_count,
+ file_size_in_bytes=entry.data_file.file_size_in_bytes,
+ column_sizes=entry.data_file.column_sizes,
+ value_counts=entry.data_file.value_counts,
+ null_value_counts=entry.data_file.null_value_counts,
+ nan_value_counts=entry.data_file.nan_value_counts,
+ lower_bounds=entry.data_file.lower_bounds,
+ upper_bounds=entry.data_file.upper_bounds,
+ key_metadata=entry.data_file.key_metadata,
+ split_offsets=entry.data_file.split_offsets,
+ equality_ids=entry.data_file.equality_ids,
+ sort_order_id=entry.data_file.sort_order_id,
+ spec_id=entry.data_file.spec_id,
+ )
+ wrapped_entry_v2 = ManifestEntry(*entry.record_fields())
+ wrapped_entry_v2.data_file = wrapped_data_file_v2_debug
+ with TemporaryDirectory() as tmpdir:
+ tmp_avro_file = tmpdir + "/test_write_manifest.avro"
+ output = PyArrowFileIO().new_output(tmp_avro_file)
+ with write_manifest(
+ format_version=2,
+ spec=test_spec,
+ schema=test_schema,
+ output_file=output,
+ snapshot_id=test_snapshot.snapshot_id,
+ ) as manifest_writer:
+ # For simplicity, try one entry first
+ manifest_writer.add_entry(test_manifest_entries[0])
+
+ with open(tmp_avro_file, "rb") as fo:
+ r = reader(fo=fo)
+ it = iter(r)
+ fa_entry = next(it)
+
+ assert fa_entry == todict(wrapped_entry_v2)
diff --git a/python/tests/utils/test_manifest.py
b/python/tests/utils/test_manifest.py
index 76a4a8a2b4..41af844bba 100644
--- a/python/tests/utils/test_manifest.py
+++ b/python/tests/utils/test_manifest.py
@@ -14,6 +14,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+# pylint: disable=redefined-outer-name,arguments-renamed,fixme
+from tempfile import TemporaryDirectory
+from typing import Dict
+
+import fastavro
+import pytest
from pyiceberg.io import load_file_io
from pyiceberg.io.pyarrow import PyArrowFileIO
@@ -26,9 +32,25 @@ from pyiceberg.manifest import (
ManifestFile,
PartitionFieldSummary,
read_manifest_list,
+ write_manifest,
+ write_manifest_list,
)
+from pyiceberg.partitioning import PartitionField, PartitionSpec
+from pyiceberg.schema import Schema
from pyiceberg.table import Snapshot
from pyiceberg.table.snapshots import Operation, Summary
+from pyiceberg.transforms import IdentityTransform
+from pyiceberg.typedef import Record
+from pyiceberg.types import IntegerType, NestedField
+
+
+def _verify_metadata_with_fastavro(avro_file: str, expected_metadata:
Dict[str, str]) -> None:
+ with open(avro_file, "rb") as f:
+ reader = fastavro.reader(f)
+ metadata = reader.metadata
+ for k, v in expected_metadata.items():
+ assert k in metadata
+ assert metadata[k] == v
def test_read_manifest_entry(generated_manifest_entry_file: str) -> None:
@@ -278,3 +300,245 @@ def
test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None:
assert entry.file_sequence_number == 3
assert entry.snapshot_id == 8744736658442914487
assert entry.status == ManifestEntryStatus.ADDED
+
+
[email protected]("format_version", [1, 2])
+def test_write_manifest(generated_manifest_file_file_v1: str,
generated_manifest_file_file_v2: str, format_version: int) -> None:
+ io = load_file_io()
+ snapshot = Snapshot(
+ snapshot_id=25,
+ parent_snapshot_id=19,
+ timestamp_ms=1602638573590,
+ manifest_list=generated_manifest_file_file_v1 if format_version == 1
else generated_manifest_file_file_v2,
+ summary=Summary(Operation.APPEND),
+ schema_id=3,
+ )
+ demo_manifest_file = snapshot.manifests(io)[0]
+ manifest_entries = demo_manifest_file.fetch_manifest_entry(io)
+ test_schema = Schema(
+ NestedField(1, "VendorID", IntegerType(), False), NestedField(2,
"tpep_pickup_datetime", IntegerType(), False)
+ )
+ test_spec = PartitionSpec(
+ PartitionField(source_id=1, field_id=1, transform=IdentityTransform(),
name="VendorID"),
+ PartitionField(source_id=2, field_id=2, transform=IdentityTransform(),
name="tpep_pickup_datetime"),
+ spec_id=demo_manifest_file.partition_spec_id,
+ )
+ with TemporaryDirectory() as tmpdir:
+ tmp_avro_file = tmpdir + "/test_write_manifest.avro"
+ output = io.new_output(tmp_avro_file)
+ with write_manifest(
+ format_version=format_version, # type: ignore
+ spec=test_spec,
+ schema=test_schema,
+ output_file=output,
+ snapshot_id=8744736658442914487,
+ ) as writer:
+ for entry in manifest_entries:
+ writer.add_entry(entry)
+ new_manifest = writer.to_manifest_file()
+ with pytest.raises(RuntimeError):
+ writer.add_entry(manifest_entries[0])
+
+ expected_metadata = {
+ "schema": test_schema.json(),
+ "partition-spec": test_spec.json(),
+ "partition-spec-id": str(test_spec.spec_id),
+ "format-version": str(format_version),
+ }
+ if format_version == 2:
+ expected_metadata["content"] = "data"
+ _verify_metadata_with_fastavro(
+ tmp_avro_file,
+ expected_metadata,
+ )
+ new_manifest_entries = new_manifest.fetch_manifest_entry(io)
+
+ manifest_entry = new_manifest_entries[0]
+
+ assert manifest_entry.status == ManifestEntryStatus.ADDED
+ assert manifest_entry.snapshot_id == 8744736658442914487
+ assert manifest_entry.data_sequence_number == 0 if format_version == 1
else 3
+ assert isinstance(manifest_entry.data_file, DataFile)
+
+ data_file = manifest_entry.data_file
+
+ assert data_file.content is DataFileContent.DATA
+ assert (
+ data_file.file_path
+ ==
"/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet"
+ )
+ assert data_file.file_format == FileFormat.PARQUET
+ assert data_file.partition == Record(VendorID=1,
tpep_pickup_datetime=1925)
+ assert data_file.record_count == 19513
+ assert data_file.file_size_in_bytes == 388872
+ if format_version == 1:
+ assert data_file.block_size_in_bytes == 67108864
+ else:
+ assert data_file.block_size_in_bytes is None
+ assert data_file.column_sizes == {
+ 1: 53,
+ 2: 98153,
+ 3: 98693,
+ 4: 53,
+ 5: 53,
+ 6: 53,
+ 7: 17425,
+ 8: 18528,
+ 9: 53,
+ 10: 44788,
+ 11: 35571,
+ 12: 53,
+ 13: 1243,
+ 14: 2355,
+ 15: 12750,
+ 16: 4029,
+ 17: 110,
+ 18: 47194,
+ 19: 2948,
+ }
+ assert data_file.value_counts == {
+ 1: 19513,
+ 2: 19513,
+ 3: 19513,
+ 4: 19513,
+ 5: 19513,
+ 6: 19513,
+ 7: 19513,
+ 8: 19513,
+ 9: 19513,
+ 10: 19513,
+ 11: 19513,
+ 12: 19513,
+ 13: 19513,
+ 14: 19513,
+ 15: 19513,
+ 16: 19513,
+ 17: 19513,
+ 18: 19513,
+ 19: 19513,
+ }
+ assert data_file.null_value_counts == {
+ 1: 19513,
+ 2: 0,
+ 3: 0,
+ 4: 19513,
+ 5: 19513,
+ 6: 19513,
+ 7: 0,
+ 8: 0,
+ 9: 19513,
+ 10: 0,
+ 11: 0,
+ 12: 19513,
+ 13: 0,
+ 14: 0,
+ 15: 0,
+ 16: 0,
+ 17: 0,
+ 18: 0,
+ 19: 0,
+ }
+ assert data_file.nan_value_counts == {16: 0, 17: 0, 18: 0, 19: 0, 10:
0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0}
+ assert data_file.lower_bounds == {
+ 2: b"2020-04-01 00:00",
+ 3: b"2020-04-01 00:12",
+ 7: b"\x03\x00\x00\x00",
+ 8: b"\x01\x00\x00\x00",
+ 10: b"\xf6(\\\x8f\xc2\x05S\xc0",
+ 11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
+ 13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
+ 14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
+ 15: b")\\\x8f\xc2\xf5(\x08\xc0",
+ 16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
+ 17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
+ 18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
+ 19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
+ }
+ assert data_file.upper_bounds == {
+ 2: b"2020-04-30 23:5:",
+ 3: b"2020-05-01 00:41",
+ 7: b"\t\x01\x00\x00",
+ 8: b"\t\x01\x00\x00",
+ 10: b"\xcd\xcc\xcc\xcc\xcc,_@",
+ 11: b"\x1f\x85\xebQ\\\xe2\xfe@",
+ 13: b"\x00\x00\x00\x00\x00\x00\x12@",
+ 14: b"\x00\x00\x00\x00\x00\x00\xe0?",
+ 15: b"q=\n\xd7\xa3\xf01@",
+ 16: b"\x00\x00\x00\x00\x00`B@",
+ 17: b"333333\xd3?",
+ 18: b"\x00\x00\x00\x00\x00\x18b@",
+ 19: b"\x00\x00\x00\x00\x00\x00\x04@",
+ }
+ assert data_file.key_metadata is None
+ assert data_file.split_offsets == [4]
+ assert data_file.equality_ids is None
+ assert data_file.sort_order_id == 0
+
+
[email protected]("format_version", [1, 2])
+def test_write_manifest_list(
+ generated_manifest_file_file_v1: str, generated_manifest_file_file_v2:
str, format_version: int
+) -> None:
+ io = load_file_io()
+
+ snapshot = Snapshot(
+ snapshot_id=25,
+ parent_snapshot_id=19,
+ timestamp_ms=1602638573590,
+ manifest_list=generated_manifest_file_file_v1 if format_version == 1
else generated_manifest_file_file_v2,
+ summary=Summary(Operation.APPEND),
+ schema_id=3,
+ )
+
+ demo_manifest_list = snapshot.manifests(io)
+ with TemporaryDirectory() as tmp_dir:
+ path = tmp_dir + "/manifest-list.avro"
+ output = io.new_output(path)
+ with write_manifest_list(
+ format_version=format_version, output_file=output, snapshot_id=25,
parent_snapshot_id=19, sequence_number=0 # type: ignore
+ ) as writer:
+ writer.add_manifests(demo_manifest_list)
+ new_manifest_list = list(read_manifest_list(io.new_input(path)))
+
+ expected_metadata = {"snapshot-id": "25", "parent-snapshot-id": "19",
"format-version": str(format_version)}
+ if format_version == 2:
+ expected_metadata["sequence-number"] = "0"
+ _verify_metadata_with_fastavro(path, expected_metadata)
+
+ manifest_file = new_manifest_list[0]
+
+ assert manifest_file.manifest_length == 7989
+ assert manifest_file.partition_spec_id == 0
+ assert manifest_file.content == ManifestContent.DATA if format_version
== 1 else ManifestContent.DELETES
+ assert manifest_file.sequence_number == 0 if format_version == 1 else 3
+ assert manifest_file.min_sequence_number == 0 if format_version == 1
else 3
+ assert manifest_file.added_snapshot_id == 9182715666859759686
+ assert manifest_file.added_files_count == 3
+ assert manifest_file.existing_files_count == 0
+ assert manifest_file.deleted_files_count == 0
+ assert manifest_file.added_rows_count == 237993
+ assert manifest_file.existing_rows_count == 0
+ assert manifest_file.deleted_rows_count == 0
+ assert manifest_file.key_metadata is None
+
+ assert isinstance(manifest_file.partitions, list)
+
+ partition = manifest_file.partitions[0]
+
+ assert isinstance(partition, PartitionFieldSummary)
+
+ assert partition.contains_null is True
+ assert partition.contains_nan is False
+ assert partition.lower_bound == b"\x01\x00\x00\x00"
+ assert partition.upper_bound == b"\x02\x00\x00\x00"
+
+ entries = manifest_file.fetch_manifest_entry(io)
+
+ assert isinstance(entries, list)
+
+ entry = entries[0]
+
+ assert entry.data_sequence_number == 0 if format_version == 1 else 3
+ assert entry.file_sequence_number == 0 if format_version == 1 else 3
+ assert entry.snapshot_id == 8744736658442914487
+ assert entry.status == ManifestEntryStatus.ADDED