This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 8062aef8b6 Python: ManifestWriter and ManifestListWriter (#8622)
8062aef8b6 is described below

commit 8062aef8b68ef18647ea10e01ff6e82a89582fe9
Author: HonahX <[email protected]>
AuthorDate: Fri Sep 29 02:00:29 2023 -0700

    Python: ManifestWriter and ManifestListWriter (#8622)
    
    * add ManifestWriter and ManifestListWriter
    
    * fix lint issue
    
    * remove assert, fix format issue
    
    * add prepare to ManifestWriter, remove TODO, fix format issue
    
    * fix some nit issue, add prepare... to ensure the correctness of data 
written
    
    * fix format issue
    
    * fix lint issue
    
    * avoid creating too much objects, handling v1, v2 data_file_type and 
DataFile class properly.
    
    * modify tests
    
    * refactor the way of handling two version of DataFile record
    
    * add integration tests, fix bugs, change PartitionSummary to a function
    
    * fix format issue
    
    * make data_type_v2 constants
---
 python/pyiceberg/avro/writer.py           |   4 +-
 python/pyiceberg/manifest.py              | 468 +++++++++++++++++++++++++++++-
 python/pyiceberg/typedef.py               |   1 +
 python/tests/avro/test_file.py            |   1 +
 python/tests/test_integration_manifest.py | 126 ++++++++
 python/tests/utils/test_manifest.py       | 264 +++++++++++++++++
 6 files changed, 855 insertions(+), 9 deletions(-)

diff --git a/python/pyiceberg/avro/writer.py b/python/pyiceberg/avro/writer.py
index fba17e8971..ad6a755614 100644
--- a/python/pyiceberg/avro/writer.py
+++ b/python/pyiceberg/avro/writer.py
@@ -34,7 +34,7 @@ from typing import (
 from uuid import UUID
 
 from pyiceberg.avro.encoder import BinaryEncoder
-from pyiceberg.types import StructType
+from pyiceberg.typedef import Record
 from pyiceberg.utils.decimal import decimal_required_bytes, decimal_to_bytes
 from pyiceberg.utils.singleton import Singleton
 
@@ -160,7 +160,7 @@ class OptionWriter(Writer):
 class StructWriter(Writer):
     field_writers: Tuple[Writer, ...] = dataclassfield()
 
-    def write(self, encoder: BinaryEncoder, val: StructType) -> None:
+    def write(self, encoder: BinaryEncoder, val: Record) -> None:
         for writer, value in zip(self.field_writers, val.record_fields()):
             writer.write(encoder, value)
 
diff --git a/python/pyiceberg/manifest.py b/python/pyiceberg/manifest.py
index 57ec11db77..8bdbfd3524 100644
--- a/python/pyiceberg/manifest.py
+++ b/python/pyiceberg/manifest.py
@@ -14,31 +14,51 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from __future__ import annotations
+
+import math
+from abc import ABC, abstractmethod
 from enum import Enum
+from functools import singledispatch
+from types import TracebackType
 from typing import (
     Any,
     Dict,
     Iterator,
     List,
+    Literal,
     Optional,
+    Type,
 )
 
-from pyiceberg.avro.file import AvroFile
-from pyiceberg.io import FileIO, InputFile
+from pyiceberg.avro.file import AvroFile, AvroOutputFile
+from pyiceberg.conversions import to_bytes
+from pyiceberg.exceptions import ValidationError
+from pyiceberg.io import FileIO, InputFile, OutputFile
+from pyiceberg.partitioning import PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.typedef import Record
 from pyiceberg.types import (
     BinaryType,
     BooleanType,
+    DateType,
+    IcebergType,
     IntegerType,
     ListType,
     LongType,
     MapType,
     NestedField,
+    PrimitiveType,
     StringType,
     StructType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
 )
 
+UNASSIGNED_SEQ = -1
+DEFAULT_BLOCK_SIZE = 67108864  # 64 * 1024 * 1024
+
 
 class DataFileContent(int, Enum):
     DATA = 0
@@ -79,7 +99,7 @@ class FileFormat(str, Enum):
         return f"FileFormat.{self.name}"
 
 
-DATA_FILE_TYPE = StructType(
+DATA_FILE_TYPE_V1 = StructType(
     NestedField(
         field_id=134,
         name="content",
@@ -90,7 +110,11 @@ DATA_FILE_TYPE = StructType(
     ),
     NestedField(field_id=100, name="file_path", field_type=StringType(), 
required=True, doc="Location URI with FS scheme"),
     NestedField(
-        field_id=101, name="file_format", field_type=StringType(), 
required=True, doc="File format name: avro, orc, or parquet"
+        field_id=101,
+        name="file_format",
+        field_type=StringType(),
+        required=True,
+        doc="File format name: avro, orc, or parquet",
     ),
     NestedField(
         field_id=102,
@@ -101,6 +125,13 @@ DATA_FILE_TYPE = StructType(
     ),
     NestedField(field_id=103, name="record_count", field_type=LongType(), 
required=True, doc="Number of records in the file"),
     NestedField(field_id=104, name="file_size_in_bytes", 
field_type=LongType(), required=True, doc="Total file size in bytes"),
+    NestedField(
+        field_id=105,
+        name="block_size_in_bytes",
+        field_type=LongType(),
+        required=False,
+        doc="Deprecated. Always write a default in v1. Do not write in v2.",
+    ),
     NestedField(
         field_id=108,
         name="column_sizes",
@@ -162,6 +193,55 @@ DATA_FILE_TYPE = StructType(
     NestedField(field_id=141, name="spec_id", field_type=IntegerType(), 
required=False, doc="Partition spec ID"),
 )
 
+DATA_FILE_TYPE_V2 = StructType(*[field for field in DATA_FILE_TYPE_V1.fields 
if field.field_id != 105])
+
+
+@singledispatch
+def partition_field_to_data_file_partition_field(partition_field_type: 
IcebergType) -> PrimitiveType:
+    raise TypeError(f"Unsupported partition field type: 
{partition_field_type}")
+
+
+@partition_field_to_data_file_partition_field.register(LongType)
+@partition_field_to_data_file_partition_field.register(DateType)
+@partition_field_to_data_file_partition_field.register(TimeType)
+@partition_field_to_data_file_partition_field.register(TimestampType)
+@partition_field_to_data_file_partition_field.register(TimestamptzType)
+def _(partition_field_type: PrimitiveType) -> IntegerType:
+    return IntegerType()
+
+
+@partition_field_to_data_file_partition_field.register(PrimitiveType)
+def _(partition_field_type: PrimitiveType) -> PrimitiveType:
+    return partition_field_type
+
+
+def data_file_with_partition(partition_type: StructType, format_version: 
Literal[1, 2]) -> StructType:
+    data_file_partition_type = StructType(
+        *[
+            NestedField(
+                field_id=field.field_id,
+                name=field.name,
+                
field_type=partition_field_to_data_file_partition_field(field.field_type),
+            )
+            for field in partition_type.fields
+        ]
+    )
+
+    return StructType(
+        *[
+            NestedField(
+                field_id=102,
+                name="partition",
+                field_type=data_file_partition_type,
+                required=True,
+                doc="Partition data tuple, schema based on the partition spec",
+            )
+            if field.field_id == 102
+            else field
+            for field in (DATA_FILE_TYPE_V1.fields if format_version == 1 else 
DATA_FILE_TYPE_V2.fields)
+        ]
+    )
+
 
 class DataFile(Record):
     __slots__ = (
@@ -171,6 +251,7 @@ class DataFile(Record):
         "partition",
         "record_count",
         "file_size_in_bytes",
+        "block_size_in_bytes",
         "column_sizes",
         "value_counts",
         "null_value_counts",
@@ -189,6 +270,7 @@ class DataFile(Record):
     partition: Record
     record_count: int
     file_size_in_bytes: int
+    block_size_in_bytes: Optional[int]
     column_sizes: Dict[int, int]
     value_counts: Dict[int, int]
     null_value_counts: Dict[int, int]
@@ -208,8 +290,11 @@ class DataFile(Record):
             value = FileFormat[value]
         super().__setattr__(name, value)
 
-    def __init__(self, *data: Any, **named_data: Any) -> None:
-        super().__init__(*data, **{"struct": DATA_FILE_TYPE, **named_data})
+    def __init__(self, format_version: Literal[1, 2] = 1, *data: Any, 
**named_data: Any) -> None:
+        super().__init__(
+            *data,
+            **{"struct": DATA_FILE_TYPE_V1 if format_version == 1 else 
DATA_FILE_TYPE_V2, **named_data},
+        )
 
     def __hash__(self) -> int:
         """Return the hash of the file path."""
@@ -228,12 +313,21 @@ MANIFEST_ENTRY_SCHEMA = Schema(
     NestedField(1, "snapshot_id", LongType(), required=False),
     NestedField(3, "data_sequence_number", LongType(), required=False),
     NestedField(4, "file_sequence_number", LongType(), required=False),
-    NestedField(2, "data_file", DATA_FILE_TYPE, required=True),
+    NestedField(2, "data_file", DATA_FILE_TYPE_V1, required=True),
 )
 
 MANIFEST_ENTRY_SCHEMA_STRUCT = MANIFEST_ENTRY_SCHEMA.as_struct()
 
 
+def manifest_entry_schema_with_data_file(data_file: StructType) -> Schema:
+    return Schema(
+        *[
+            NestedField(2, "data_file", data_file, required=True) if 
field.field_id == 2 else field
+            for field in MANIFEST_ENTRY_SCHEMA.fields
+        ]
+    )
+
+
 class ManifestEntry(Record):
     __slots__ = ("status", "snapshot_id", "data_sequence_number", 
"file_sequence_number", "data_file")
     status: ManifestEntryStatus
@@ -265,6 +359,54 @@ class PartitionFieldSummary(Record):
         super().__init__(*data, **{"struct": PARTITION_FIELD_SUMMARY_TYPE, 
**named_data})
 
 
+class PartitionFieldStats:
+    _type: PrimitiveType
+    _contains_null: bool
+    _contains_nan: bool
+    _min: Optional[Any]
+    _max: Optional[Any]
+
+    def __init__(self, iceberg_type: PrimitiveType) -> None:
+        self._type = iceberg_type
+        self._contains_null = False
+        self._contains_nan = False
+        self._min = None
+        self._max = None
+
+    def to_summary(self) -> PartitionFieldSummary:
+        return PartitionFieldSummary(
+            contains_null=self._contains_null,
+            contains_nan=self._contains_nan,
+            lower_bound=to_bytes(self._type, self._min) if self._min is not 
None else None,
+            upper_bound=to_bytes(self._type, self._max) if self._max is not 
None else None,
+        )
+
+    def update(self, value: Any) -> None:
+        if value is None:
+            self._contains_null = True
+        elif isinstance(value, float) and math.isnan(value):
+            self._contains_nan = True
+        else:
+            if self._min is None:
+                self._min = value
+                self._max = value
+            else:
+                self._max = max(self._max, value)
+                self._min = min(self._min, value)
+
+
+def construct_partition_summaries(spec: PartitionSpec, schema: Schema, 
partitions: List[Record]) -> List[PartitionFieldSummary]:
+    types = [field.field_type for field in spec.partition_type(schema).fields]
+    field_stats = [PartitionFieldStats(field_type) for field_type in types]
+    for partition_keys in partitions:
+        for i, field_type in enumerate(types):
+            if not isinstance(field_type, PrimitiveType):
+                raise ValueError(f"Expected a primitive type for the partition 
field, got {field_type}")
+            partition_key = partition_keys[i]
+            field_stats[i].update(partition_key)
+    return [field.to_summary() for field in field_stats]
+
+
 MANIFEST_FILE_SCHEMA: Schema = Schema(
     NestedField(500, "manifest_path", StringType(), required=True, 
doc="Location URI with FS scheme"),
     NestedField(501, "manifest_length", LongType(), required=True),
@@ -405,3 +547,315 @@ def _inherit_sequence_number(entry: ManifestEntry, 
manifest: ManifestFile) -> Ma
         entry.file_sequence_number = manifest.sequence_number
 
     return entry
+
+
+class ManifestWriter(ABC):
+    closed: bool
+    _spec: PartitionSpec
+    _schema: Schema
+    _output_file: OutputFile
+    _writer: AvroOutputFile[ManifestEntry]
+    _snapshot_id: int
+    _meta: Dict[str, str]
+    _added_files: int
+    _added_rows: int
+    _existing_files: int
+    _existing_rows: int
+    _deleted_files: int
+    _deleted_rows: int
+    _min_data_sequence_number: Optional[int]
+    _partitions: List[Record]
+
+    def __init__(self, spec: PartitionSpec, schema: Schema, output_file: 
OutputFile, snapshot_id: int, meta: Dict[str, str]):
+        self.closed = False
+        self._spec = spec
+        self._schema = schema
+        self._output_file = output_file
+        self._snapshot_id = snapshot_id
+        self._meta = meta
+
+        self._added_files = 0
+        self._added_rows = 0
+        self._existing_files = 0
+        self._existing_rows = 0
+        self._deleted_files = 0
+        self._deleted_rows = 0
+        self._min_data_sequence_number = None
+        self._partitions = []
+
+    def __enter__(self) -> ManifestWriter:
+        """Open the writer."""
+        self._writer = self.new_writer()
+        self._writer.__enter__()
+        return self
+
+    def __exit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exc_value: Optional[BaseException],
+        traceback: Optional[TracebackType],
+    ) -> None:
+        """Close the writer."""
+        self.closed = True
+        self._writer.__exit__(exc_type, exc_value, traceback)
+
+    @abstractmethod
+    def content(self) -> ManifestContent:
+        ...
+
+    @abstractmethod
+    def new_writer(self) -> AvroOutputFile[ManifestEntry]:
+        ...
+
+    @abstractmethod
+    def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
+        ...
+
+    def to_manifest_file(self) -> ManifestFile:
+        """Return the manifest file."""
+        # once the manifest file is generated, no more entries can be added
+        self.closed = True
+        min_sequence_number = self._min_data_sequence_number or UNASSIGNED_SEQ
+        return ManifestFile(
+            manifest_path=self._output_file.location,
+            manifest_length=len(self._writer.output_file),
+            partition_spec_id=self._spec.spec_id,
+            content=self.content(),
+            sequence_number=UNASSIGNED_SEQ,
+            min_sequence_number=min_sequence_number,
+            added_snapshot_id=self._snapshot_id,
+            added_files_count=self._added_files,
+            existing_files_count=self._existing_files,
+            deleted_files_count=self._deleted_files,
+            added_rows_count=self._added_rows,
+            existing_rows_count=self._existing_rows,
+            deleted_rows_count=self._deleted_rows,
+            partitions=construct_partition_summaries(self._spec, self._schema, 
self._partitions),
+            key_metadatas=None,
+        )
+
+    def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
+        if self.closed:
+            raise RuntimeError("Cannot add entry to closed manifest writer")
+        if entry.status == ManifestEntryStatus.ADDED:
+            self._added_files += 1
+            self._added_rows += entry.data_file.record_count
+        elif entry.status == ManifestEntryStatus.EXISTING:
+            self._existing_files += 1
+            self._existing_rows += entry.data_file.record_count
+        elif entry.status == ManifestEntryStatus.DELETED:
+            self._deleted_files += 1
+            self._deleted_rows += entry.data_file.record_count
+
+        self._partitions.append(entry.data_file.partition)
+
+        if (
+            (entry.status == ManifestEntryStatus.ADDED or entry.status == 
ManifestEntryStatus.EXISTING)
+            and entry.data_sequence_number is not None
+            and (self._min_data_sequence_number is None or 
entry.data_sequence_number < self._min_data_sequence_number)
+        ):
+            self._min_data_sequence_number = entry.data_sequence_number
+
+        self._writer.write_block([self.prepare_entry(entry)])
+        return self
+
+
+class ManifestWriterV1(ManifestWriter):
+    def __init__(self, spec: PartitionSpec, schema: Schema, output_file: 
OutputFile, snapshot_id: int):
+        super().__init__(
+            spec,
+            schema,
+            output_file,
+            snapshot_id,
+            {
+                "schema": schema.json(),
+                "partition-spec": spec.json(),
+                "partition-spec-id": str(spec.spec_id),
+                "format-version": "1",
+            },
+        )
+
+    def content(self) -> ManifestContent:
+        return ManifestContent.DATA
+
+    def new_writer(self) -> AvroOutputFile[ManifestEntry]:
+        v1_data_file_type = 
data_file_with_partition(self._spec.partition_type(self._schema), 
format_version=1)
+        v1_manifest_entry_schema = 
manifest_entry_schema_with_data_file(v1_data_file_type)
+        return AvroOutputFile[ManifestEntry](self._output_file, 
v1_manifest_entry_schema, "manifest_entry", self._meta)
+
+    def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
+        wrapped_entry = ManifestEntry(*entry.record_fields())
+        wrapped_entry.data_file.block_size_in_bytes = DEFAULT_BLOCK_SIZE
+        return wrapped_entry
+
+
+class ManifestWriterV2(ManifestWriter):
+    def __init__(self, spec: PartitionSpec, schema: Schema, output_file: 
OutputFile, snapshot_id: int):
+        super().__init__(
+            spec,
+            schema,
+            output_file,
+            snapshot_id,
+            {
+                "schema": schema.json(),
+                "partition-spec": spec.json(),
+                "partition-spec-id": str(spec.spec_id),
+                "format-version": "2",
+                "content": "data",
+            },
+        )
+
+    def content(self) -> ManifestContent:
+        return ManifestContent.DATA
+
+    def new_writer(self) -> AvroOutputFile[ManifestEntry]:
+        v2_data_file_type = 
data_file_with_partition(self._spec.partition_type(self._schema), 
format_version=2)
+        v2_manifest_entry_schema = 
manifest_entry_schema_with_data_file(v2_data_file_type)
+        return AvroOutputFile[ManifestEntry](self._output_file, 
v2_manifest_entry_schema, "manifest_entry", self._meta)
+
+    def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
+        if entry.data_sequence_number is None:
+            if entry.snapshot_id is not None and entry.snapshot_id != 
self._snapshot_id:
+                raise ValueError(f"Found unassigned sequence number for an 
entry from snapshot: {entry.snapshot_id}")
+            if entry.status != ManifestEntryStatus.ADDED:
+                raise ValueError("Only entries with status ADDED can have null 
sequence number")
+        # In v2, we should not write block_size_in_bytes field
+        wrapped_data_file_v2_debug = DataFile(
+            format_version=2,
+            content=entry.data_file.content,
+            file_path=entry.data_file.file_path,
+            file_format=entry.data_file.file_format,
+            partition=entry.data_file.partition,
+            record_count=entry.data_file.record_count,
+            file_size_in_bytes=entry.data_file.file_size_in_bytes,
+            column_sizes=entry.data_file.column_sizes,
+            value_counts=entry.data_file.value_counts,
+            null_value_counts=entry.data_file.null_value_counts,
+            nan_value_counts=entry.data_file.nan_value_counts,
+            lower_bounds=entry.data_file.lower_bounds,
+            upper_bounds=entry.data_file.upper_bounds,
+            key_metadata=entry.data_file.key_metadata,
+            split_offsets=entry.data_file.split_offsets,
+            equality_ids=entry.data_file.equality_ids,
+            sort_order_id=entry.data_file.sort_order_id,
+            spec_id=entry.data_file.spec_id,
+        )
+        wrapped_entry = ManifestEntry(
+            status=entry.status,
+            snapshot_id=entry.snapshot_id,
+            data_sequence_number=entry.data_sequence_number,
+            file_sequence_number=entry.file_sequence_number,
+            data_file=wrapped_data_file_v2_debug,
+        )
+        return wrapped_entry
+
+
+def write_manifest(
+    format_version: Literal[1, 2], spec: PartitionSpec, schema: Schema, 
output_file: OutputFile, snapshot_id: int
+) -> ManifestWriter:
+    if format_version == 1:
+        return ManifestWriterV1(spec, schema, output_file, snapshot_id)
+    elif format_version == 2:
+        return ManifestWriterV2(spec, schema, output_file, snapshot_id)
+    else:
+        raise ValueError(f"Cannot write manifest for table version: 
{format_version}")
+
+
+class ManifestListWriter(ABC):
+    _output_file: OutputFile
+    _meta: Dict[str, str]
+    _manifest_files: List[ManifestFile]
+    _commit_snapshot_id: int
+    _writer: AvroOutputFile[ManifestFile]
+
+    def __init__(self, output_file: OutputFile, meta: Dict[str, str]):
+        self._output_file = output_file
+        self._meta = meta
+        self._manifest_files = []
+
+    def __enter__(self) -> ManifestListWriter:
+        """Open the writer for writing."""
+        self._writer = AvroOutputFile[ManifestFile](self._output_file, 
MANIFEST_FILE_SCHEMA, "manifest_file", self._meta)
+        self._writer.__enter__()
+        return self
+
+    def __exit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exc_value: Optional[BaseException],
+        traceback: Optional[TracebackType],
+    ) -> None:
+        """Close the writer."""
+        self._writer.__exit__(exc_type, exc_value, traceback)
+        return
+
+    @abstractmethod
+    def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
+        ...
+
+    def add_manifests(self, manifest_files: List[ManifestFile]) -> 
ManifestListWriter:
+        self._writer.write_block([self.prepare_manifest(manifest_file) for 
manifest_file in manifest_files])
+        return self
+
+
+class ManifestListWriterV1(ManifestListWriter):
+    def __init__(self, output_file: OutputFile, snapshot_id: int, 
parent_snapshot_id: int):
+        super().__init__(
+            output_file, {"snapshot-id": str(snapshot_id), 
"parent-snapshot-id": str(parent_snapshot_id), "format-version": "1"}
+        )
+
+    def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
+        if manifest_file.content != ManifestContent.DATA:
+            raise ValidationError("Cannot store delete manifests in a v1 
table")
+        return manifest_file
+
+
+class ManifestListWriterV2(ManifestListWriter):
+    _commit_snapshot_id: int
+    _sequence_number: int
+
+    def __init__(self, output_file: OutputFile, snapshot_id: int, 
parent_snapshot_id: int, sequence_number: int):
+        super().__init__(
+            output_file,
+            {
+                "snapshot-id": str(snapshot_id),
+                "parent-snapshot-id": str(parent_snapshot_id),
+                "sequence-number": str(sequence_number),
+                "format-version": "2",
+            },
+        )
+        self._commit_snapshot_id = snapshot_id
+        self._sequence_number = sequence_number
+
+    def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
+        wrapped_manifest_file = ManifestFile(*manifest_file.record_fields())
+
+        if wrapped_manifest_file.sequence_number == UNASSIGNED_SEQ:
+            # if the sequence number is being assigned here, then the manifest 
must be created by the current operation.
+            # To validate this, check that the snapshot id matches the current 
commit
+            if self._commit_snapshot_id != 
wrapped_manifest_file.added_snapshot_id:
+                raise ValueError(
+                    f"Found unassigned sequence number for a manifest from 
snapshot: {wrapped_manifest_file.added_snapshot_id}"
+                )
+            wrapped_manifest_file.sequence_number = self._sequence_number
+
+        if wrapped_manifest_file.min_sequence_number == UNASSIGNED_SEQ:
+            if self._commit_snapshot_id != 
wrapped_manifest_file.added_snapshot_id:
+                raise ValueError(
+                    f"Found unassigned sequence number for a manifest from 
snapshot: {wrapped_manifest_file.added_snapshot_id}"
+                )
+            # if the min sequence number is not determined, then there was no 
assigned sequence number for any file
+            # written to the wrapped manifest. Replace the unassigned sequence 
number with the one for this commit
+            wrapped_manifest_file.min_sequence_number = self._sequence_number
+        return wrapped_manifest_file
+
+
+def write_manifest_list(
+    format_version: Literal[1, 2], output_file: OutputFile, snapshot_id: int, 
parent_snapshot_id: int, sequence_number: int
+) -> ManifestListWriter:
+    if format_version == 1:
+        return ManifestListWriterV1(output_file, snapshot_id, 
parent_snapshot_id)
+    elif format_version == 2:
+        return ManifestListWriterV2(output_file, snapshot_id, 
parent_snapshot_id, sequence_number)
+    else:
+        raise ValueError(f"Cannot write manifest list for table version: 
{format_version}")
diff --git a/python/pyiceberg/typedef.py b/python/pyiceberg/typedef.py
index bb9a438a03..ff2a6d1cb0 100644
--- a/python/pyiceberg/typedef.py
+++ b/python/pyiceberg/typedef.py
@@ -195,4 +195,5 @@ class Record(StructProtocol):
         return f"{self.__class__.__name__}[{', '.join(f'{key}={repr(value)}' 
for key, value in self.__dict__.items() if not key.startswith('_'))}]"
 
     def record_fields(self) -> List[str]:
+        """Return values of all the fields of the Record class except those 
specified in skip_fields."""
         return [self.__getattribute__(v) if hasattr(self, v) else None for v 
in self._position_to_field_name]
diff --git a/python/tests/avro/test_file.py b/python/tests/avro/test_file.py
index 2738770492..e9dcc7eca1 100644
--- a/python/tests/avro/test_file.py
+++ b/python/tests/avro/test_file.py
@@ -124,6 +124,7 @@ def 
test_write_manifest_entry_with_iceberg_read_with_fastavro() -> None:
         partition=Record(),
         record_count=131327,
         file_size_in_bytes=220669226,
+        block_size_in_bytes=67108864,
         column_sizes={1: 220661854},
         value_counts={1: 131327},
         null_value_counts={1: 0},
diff --git a/python/tests/test_integration_manifest.py 
b/python/tests/test_integration_manifest.py
new file mode 100644
index 0000000000..34b20f271d
--- /dev/null
+++ b/python/tests/test_integration_manifest.py
@@ -0,0 +1,126 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+
+import inspect
+from enum import Enum
+from tempfile import TemporaryDirectory
+from typing import Any
+
+import pytest
+from fastavro import reader
+
+from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.io.pyarrow import PyArrowFileIO
+from pyiceberg.manifest import (
+    DataFile,
+    ManifestEntry,
+    write_manifest,
+)
+from pyiceberg.table import Table
+from pyiceberg.utils.lazydict import LazyDict
+
+
+# helper function to serialize our objects to dicts to enable
+# direct comparison with the dicts returned by fastavro
+def todict(obj: Any) -> Any:
+    if isinstance(obj, dict) or isinstance(obj, LazyDict):
+        data = []
+        for k, v in obj.items():
+            data.append({"key": k, "value": v})
+        return data
+    elif isinstance(obj, Enum):
+        return obj.value
+    elif hasattr(obj, "__iter__") and not isinstance(obj, str) and not 
isinstance(obj, bytes):
+        return [todict(v) for v in obj]
+    elif hasattr(obj, "__dict__"):
+        return {key: todict(value) for key, value in inspect.getmembers(obj) 
if not callable(value) and not key.startswith("_")}
+    else:
+        return obj
+
+
[email protected]()
+def catalog() -> Catalog:
+    return load_catalog(
+        "local",
+        **{
+            "type": "rest",
+            "uri": "http://localhost:8181";,
+            "s3.endpoint": "http://localhost:9000";,
+            "s3.access-key-id": "admin",
+            "s3.secret-access-key": "password",
+        },
+    )
+
+
[email protected]()
+def table_test_all_types(catalog: Catalog) -> Table:
+    return catalog.load_table("default.test_all_types")
+
+
[email protected]
+def test_write_sample_manifest(table_test_all_types: Table) -> None:
+    test_snapshot = table_test_all_types.current_snapshot()
+    if test_snapshot is None:
+        raise ValueError("Table has no current snapshot, check the docker 
environment")
+    io = table_test_all_types.io
+    test_manifest_file = test_snapshot.manifests(io)[0]
+    test_manifest_entries = test_manifest_file.fetch_manifest_entry(io)
+    entry = test_manifest_entries[0]
+    test_schema = table_test_all_types.schema()
+    test_spec = table_test_all_types.spec()
+    wrapped_data_file_v2_debug = DataFile(
+        format_version=2,
+        content=entry.data_file.content,
+        file_path=entry.data_file.file_path,
+        file_format=entry.data_file.file_format,
+        partition=entry.data_file.partition,
+        record_count=entry.data_file.record_count,
+        file_size_in_bytes=entry.data_file.file_size_in_bytes,
+        column_sizes=entry.data_file.column_sizes,
+        value_counts=entry.data_file.value_counts,
+        null_value_counts=entry.data_file.null_value_counts,
+        nan_value_counts=entry.data_file.nan_value_counts,
+        lower_bounds=entry.data_file.lower_bounds,
+        upper_bounds=entry.data_file.upper_bounds,
+        key_metadata=entry.data_file.key_metadata,
+        split_offsets=entry.data_file.split_offsets,
+        equality_ids=entry.data_file.equality_ids,
+        sort_order_id=entry.data_file.sort_order_id,
+        spec_id=entry.data_file.spec_id,
+    )
+    wrapped_entry_v2 = ManifestEntry(*entry.record_fields())
+    wrapped_entry_v2.data_file = wrapped_data_file_v2_debug
+    with TemporaryDirectory() as tmpdir:
+        tmp_avro_file = tmpdir + "/test_write_manifest.avro"
+        output = PyArrowFileIO().new_output(tmp_avro_file)
+        with write_manifest(
+            format_version=2,
+            spec=test_spec,
+            schema=test_schema,
+            output_file=output,
+            snapshot_id=test_snapshot.snapshot_id,
+        ) as manifest_writer:
+            # For simplicity, try one entry first
+            manifest_writer.add_entry(test_manifest_entries[0])
+
+        with open(tmp_avro_file, "rb") as fo:
+            r = reader(fo=fo)
+            it = iter(r)
+            fa_entry = next(it)
+
+            assert fa_entry == todict(wrapped_entry_v2)
diff --git a/python/tests/utils/test_manifest.py 
b/python/tests/utils/test_manifest.py
index 76a4a8a2b4..41af844bba 100644
--- a/python/tests/utils/test_manifest.py
+++ b/python/tests/utils/test_manifest.py
@@ -14,6 +14,12 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+# pylint: disable=redefined-outer-name,arguments-renamed,fixme
+from tempfile import TemporaryDirectory
+from typing import Dict
+
+import fastavro
+import pytest
 
 from pyiceberg.io import load_file_io
 from pyiceberg.io.pyarrow import PyArrowFileIO
@@ -26,9 +32,25 @@ from pyiceberg.manifest import (
     ManifestFile,
     PartitionFieldSummary,
     read_manifest_list,
+    write_manifest,
+    write_manifest_list,
 )
+from pyiceberg.partitioning import PartitionField, PartitionSpec
+from pyiceberg.schema import Schema
 from pyiceberg.table import Snapshot
 from pyiceberg.table.snapshots import Operation, Summary
+from pyiceberg.transforms import IdentityTransform
+from pyiceberg.typedef import Record
+from pyiceberg.types import IntegerType, NestedField
+
+
+def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: 
Dict[str, str]) -> None:
+    with open(avro_file, "rb") as f:
+        reader = fastavro.reader(f)
+        metadata = reader.metadata
+        for k, v in expected_metadata.items():
+            assert k in metadata
+            assert metadata[k] == v
 
 
 def test_read_manifest_entry(generated_manifest_entry_file: str) -> None:
@@ -278,3 +300,245 @@ def 
test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None:
     assert entry.file_sequence_number == 3
     assert entry.snapshot_id == 8744736658442914487
     assert entry.status == ManifestEntryStatus.ADDED
+
+
[email protected]("format_version", [1, 2])
+def test_write_manifest(generated_manifest_file_file_v1: str, 
generated_manifest_file_file_v2: str, format_version: int) -> None:
+    io = load_file_io()
+    snapshot = Snapshot(
+        snapshot_id=25,
+        parent_snapshot_id=19,
+        timestamp_ms=1602638573590,
+        manifest_list=generated_manifest_file_file_v1 if format_version == 1 
else generated_manifest_file_file_v2,
+        summary=Summary(Operation.APPEND),
+        schema_id=3,
+    )
+    demo_manifest_file = snapshot.manifests(io)[0]
+    manifest_entries = demo_manifest_file.fetch_manifest_entry(io)
+    test_schema = Schema(
+        NestedField(1, "VendorID", IntegerType(), False), NestedField(2, 
"tpep_pickup_datetime", IntegerType(), False)
+    )
+    test_spec = PartitionSpec(
+        PartitionField(source_id=1, field_id=1, transform=IdentityTransform(), 
name="VendorID"),
+        PartitionField(source_id=2, field_id=2, transform=IdentityTransform(), 
name="tpep_pickup_datetime"),
+        spec_id=demo_manifest_file.partition_spec_id,
+    )
+    with TemporaryDirectory() as tmpdir:
+        tmp_avro_file = tmpdir + "/test_write_manifest.avro"
+        output = io.new_output(tmp_avro_file)
+        with write_manifest(
+            format_version=format_version,  # type: ignore
+            spec=test_spec,
+            schema=test_schema,
+            output_file=output,
+            snapshot_id=8744736658442914487,
+        ) as writer:
+            for entry in manifest_entries:
+                writer.add_entry(entry)
+            new_manifest = writer.to_manifest_file()
+            with pytest.raises(RuntimeError):
+                writer.add_entry(manifest_entries[0])
+
+        expected_metadata = {
+            "schema": test_schema.json(),
+            "partition-spec": test_spec.json(),
+            "partition-spec-id": str(test_spec.spec_id),
+            "format-version": str(format_version),
+        }
+        if format_version == 2:
+            expected_metadata["content"] = "data"
+        _verify_metadata_with_fastavro(
+            tmp_avro_file,
+            expected_metadata,
+        )
+        new_manifest_entries = new_manifest.fetch_manifest_entry(io)
+
+        manifest_entry = new_manifest_entries[0]
+
+        assert manifest_entry.status == ManifestEntryStatus.ADDED
+        assert manifest_entry.snapshot_id == 8744736658442914487
+        assert manifest_entry.data_sequence_number == 0 if format_version == 1 
else 3
+        assert isinstance(manifest_entry.data_file, DataFile)
+
+        data_file = manifest_entry.data_file
+
+        assert data_file.content is DataFileContent.DATA
+        assert (
+            data_file.file_path
+            == 
"/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet"
+        )
+        assert data_file.file_format == FileFormat.PARQUET
+        assert data_file.partition == Record(VendorID=1, 
tpep_pickup_datetime=1925)
+        assert data_file.record_count == 19513
+        assert data_file.file_size_in_bytes == 388872
+        if format_version == 1:
+            assert data_file.block_size_in_bytes == 67108864
+        else:
+            assert data_file.block_size_in_bytes is None
+        assert data_file.column_sizes == {
+            1: 53,
+            2: 98153,
+            3: 98693,
+            4: 53,
+            5: 53,
+            6: 53,
+            7: 17425,
+            8: 18528,
+            9: 53,
+            10: 44788,
+            11: 35571,
+            12: 53,
+            13: 1243,
+            14: 2355,
+            15: 12750,
+            16: 4029,
+            17: 110,
+            18: 47194,
+            19: 2948,
+        }
+        assert data_file.value_counts == {
+            1: 19513,
+            2: 19513,
+            3: 19513,
+            4: 19513,
+            5: 19513,
+            6: 19513,
+            7: 19513,
+            8: 19513,
+            9: 19513,
+            10: 19513,
+            11: 19513,
+            12: 19513,
+            13: 19513,
+            14: 19513,
+            15: 19513,
+            16: 19513,
+            17: 19513,
+            18: 19513,
+            19: 19513,
+        }
+        assert data_file.null_value_counts == {
+            1: 19513,
+            2: 0,
+            3: 0,
+            4: 19513,
+            5: 19513,
+            6: 19513,
+            7: 0,
+            8: 0,
+            9: 19513,
+            10: 0,
+            11: 0,
+            12: 19513,
+            13: 0,
+            14: 0,
+            15: 0,
+            16: 0,
+            17: 0,
+            18: 0,
+            19: 0,
+        }
+        assert data_file.nan_value_counts == {16: 0, 17: 0, 18: 0, 19: 0, 10: 
0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0}
+        assert data_file.lower_bounds == {
+            2: b"2020-04-01 00:00",
+            3: b"2020-04-01 00:12",
+            7: b"\x03\x00\x00\x00",
+            8: b"\x01\x00\x00\x00",
+            10: b"\xf6(\\\x8f\xc2\x05S\xc0",
+            11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
+            13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
+            14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
+            15: b")\\\x8f\xc2\xf5(\x08\xc0",
+            16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
+            17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
+            18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
+            19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
+        }
+        assert data_file.upper_bounds == {
+            2: b"2020-04-30 23:5:",
+            3: b"2020-05-01 00:41",
+            7: b"\t\x01\x00\x00",
+            8: b"\t\x01\x00\x00",
+            10: b"\xcd\xcc\xcc\xcc\xcc,_@",
+            11: b"\x1f\x85\xebQ\\\xe2\xfe@",
+            13: b"\x00\x00\x00\x00\x00\x00\x12@",
+            14: b"\x00\x00\x00\x00\x00\x00\xe0?",
+            15: b"q=\n\xd7\xa3\xf01@",
+            16: b"\x00\x00\x00\x00\x00`B@",
+            17: b"333333\xd3?",
+            18: b"\x00\x00\x00\x00\x00\x18b@",
+            19: b"\x00\x00\x00\x00\x00\x00\x04@",
+        }
+        assert data_file.key_metadata is None
+        assert data_file.split_offsets == [4]
+        assert data_file.equality_ids is None
+        assert data_file.sort_order_id == 0
+
+
[email protected]("format_version", [1, 2])
+def test_write_manifest_list(
+    generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: 
str, format_version: int
+) -> None:
+    io = load_file_io()
+
+    snapshot = Snapshot(
+        snapshot_id=25,
+        parent_snapshot_id=19,
+        timestamp_ms=1602638573590,
+        manifest_list=generated_manifest_file_file_v1 if format_version == 1 
else generated_manifest_file_file_v2,
+        summary=Summary(Operation.APPEND),
+        schema_id=3,
+    )
+
+    demo_manifest_list = snapshot.manifests(io)
+    with TemporaryDirectory() as tmp_dir:
+        path = tmp_dir + "/manifest-list.avro"
+        output = io.new_output(path)
+        with write_manifest_list(
+            format_version=format_version, output_file=output, snapshot_id=25, 
parent_snapshot_id=19, sequence_number=0  # type: ignore
+        ) as writer:
+            writer.add_manifests(demo_manifest_list)
+        new_manifest_list = list(read_manifest_list(io.new_input(path)))
+
+        expected_metadata = {"snapshot-id": "25", "parent-snapshot-id": "19", 
"format-version": str(format_version)}
+        if format_version == 2:
+            expected_metadata["sequence-number"] = "0"
+        _verify_metadata_with_fastavro(path, expected_metadata)
+
+        manifest_file = new_manifest_list[0]
+
+        assert manifest_file.manifest_length == 7989
+        assert manifest_file.partition_spec_id == 0
+        assert manifest_file.content == ManifestContent.DATA if format_version 
== 1 else ManifestContent.DELETES
+        assert manifest_file.sequence_number == 0 if format_version == 1 else 3
+        assert manifest_file.min_sequence_number == 0 if format_version == 1 
else 3
+        assert manifest_file.added_snapshot_id == 9182715666859759686
+        assert manifest_file.added_files_count == 3
+        assert manifest_file.existing_files_count == 0
+        assert manifest_file.deleted_files_count == 0
+        assert manifest_file.added_rows_count == 237993
+        assert manifest_file.existing_rows_count == 0
+        assert manifest_file.deleted_rows_count == 0
+        assert manifest_file.key_metadata is None
+
+        assert isinstance(manifest_file.partitions, list)
+
+        partition = manifest_file.partitions[0]
+
+        assert isinstance(partition, PartitionFieldSummary)
+
+        assert partition.contains_null is True
+        assert partition.contains_nan is False
+        assert partition.lower_bound == b"\x01\x00\x00\x00"
+        assert partition.upper_bound == b"\x02\x00\x00\x00"
+
+        entries = manifest_file.fetch_manifest_entry(io)
+
+        assert isinstance(entries, list)
+
+        entry = entries[0]
+
+        assert entry.data_sequence_number == 0 if format_version == 1 else 3
+        assert entry.file_sequence_number == 0 if format_version == 1 else 3
+        assert entry.snapshot_id == 8744736658442914487
+        assert entry.status == ManifestEntryStatus.ADDED


Reply via email to