Fokko commented on code in PR #4920:
URL: https://github.com/apache/iceberg/pull/4920#discussion_r897838574


##########
python/src/iceberg/avro/file.py:
##########
@@ -0,0 +1,181 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint: disable=W0621
+"""
+Avro reader for reading Avro files
+"""
+from __future__ import annotations
+
+import json
+from dataclasses import dataclass
+from io import SEEK_SET
+
+from iceberg.avro.codecs import KNOWN_CODECS, Codec
+from iceberg.avro.decoder import BinaryDecoder
+from iceberg.avro.reader import AvroStruct, ConstructReader, StructReader
+from iceberg.io.base import InputFile, InputStream
+from iceberg.io.memory import MemoryInputStream
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    FixedType,
+    MapType,
+    NestedField,
+    StringType,
+    StructType,
+)
+from iceberg.utils.schema_conversion import AvroSchemaConversion
+
+VERSION = 1
+MAGIC = bytes(b"Obj" + bytearray([VERSION]))
+MAGIC_SIZE = len(MAGIC)
+SYNC_SIZE = 16
+META_SCHEMA = StructType(
+    NestedField(name="magic", field_id=100, 
field_type=FixedType(length=MAGIC_SIZE), required=True),
+    NestedField(
+        field_id=200,
+        name="meta",
+        field_type=MapType(key_id=201, key_type=StringType(), value_id=202, 
value_type=StringType(), value_required=True),
+        required=True,
+    ),
+    NestedField(field_id=300, name="sync", 
field_type=FixedType(length=SYNC_SIZE), required=True),
+)
+
+_CODEC_KEY = "avro.codec"
+_SCHEMA_KEY = "avro.schema"
+
+
+@dataclass(frozen=True)
+class AvroFileHeader:
+    magic: bytes
+    meta: dict[str, str]
+    sync: bytes
+
+    def compression_codec(self) -> type[Codec] | None:
+        """Get the file's compression codec algorithm from the file's metadata.
+
+        In the case of a null codec, we return a None indicating that we
+        don't need to compress/decompress
+        """
+        codec_name = self.meta.get(_CODEC_KEY, "null")
+        if codec_name not in KNOWN_CODECS:
+            raise ValueError(f"Unsupported codec: {codec_name}")
+
+        return KNOWN_CODECS[codec_name]
+
+    def get_schema(self) -> Schema:
+        if _SCHEMA_KEY in self.meta:
+            avro_schema_string = self.meta[_SCHEMA_KEY]
+            avro_schema = json.loads(avro_schema_string)
+            return AvroSchemaConversion().avro_to_iceberg(avro_schema)
+        else:
+            raise ValueError("No schema found in Avro file headers")
+
+
+@dataclass
+class Block:
+    reader: StructReader
+    block_records: int
+    block_decoder: BinaryDecoder
+    position: int = 0
+
+    def __iter__(self):
+        return self
+
+    def has_next(self) -> bool:
+        return self.position < self.block_records
+
+    def __next__(self) -> AvroStruct:
+        if self.has_next():
+            self.position += 1
+            return self.reader.read(self.block_decoder)
+        raise StopIteration
+
+
+class AvroFile:
+    input_file: InputFile
+    input_stream: InputStream
+    header: AvroFileHeader
+    schema: Schema
+    file_length: int
+    reader: StructReader
+
+    decoder: BinaryDecoder
+    block: Block | None = None
+
+    def __init__(self, input_file: InputFile) -> None:
+        self.input_file = input_file
+
+    def __enter__(self):

Review Comment:
   I'm happy to do this, but I would suggest doing this in a separate PR. We 
could also extend the Input- and OutputFile. That `__enter__` calls `open()` or 
`create()`. Having a separate Iterator isn't pythonic.
   
   With regard to the threading, I think that's another can of worms. (At least 
as a start) reading a file should not be considered thread-safe, and we should 
not share the file across threads. I would rather suggest reading the files in 
parallel using something like 
[multiprocessing](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap).
 We could also split out the blocks in the Avro file if we like. Another option 
would be to make the read 
[async](https://docs.python.org/3/library/asyncio.html). We could also go fancy 
and go for a [async iterator](https://peps.python.org/pep-0525/). But looking 
at the size of this PR, I think we should split that out. Let me know if you 
feel otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to