rdblue commented on code in PR #4920:
URL: https://github.com/apache/iceberg/pull/4920#discussion_r895220732


##########
python/src/iceberg/avro/reader.py:
##########
@@ -0,0 +1,318 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Classes for building the Reader tree
+
+Constructing a reader tree from the schema makes it easy
+to decouple the reader implementation from the schema.
+
+The reader tree can be changed in such a way that the
+read schema is different, while respecting the read schema
+"""
+from __future__ import annotations
+
+from abc import abstractmethod
+from dataclasses import dataclass, field
+from datetime import date, datetime, time
+from decimal import Decimal
+from functools import singledispatch
+from typing import Any
+from uuid import UUID
+
+from iceberg.avro.decoder import BinaryDecoder
+from iceberg.files import StructProtocol
+from iceberg.schema import Schema, SchemaVisitor
+from iceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DecimalType,
+    DoubleType,
+    FixedType,
+    FloatType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    NestedField,
+    PrimitiveType,
+    StringType,
+    StructType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
+)
+from iceberg.utils.singleton import Singleton
+
+
+@dataclass(frozen=True)
+class AvroStruct(StructProtocol):
+    _data: list[Any | StructProtocol] = field()
+
+    def set(self, pos: int, value: Any) -> None:
+        self._data[pos] = value
+
+    def get(self, pos: int) -> Any:
+        return self._data[pos]
+
+
+class Reader(metaclass=Singleton):
+    @abstractmethod
+    def read(self, decoder: BinaryDecoder) -> Any:
+        ...
+
+
+class NoneReader(Reader):
+    def read(self, _: BinaryDecoder) -> None:
+        return None
+
+
+class BooleanReader(Reader):
+    def read(self, decoder: BinaryDecoder) -> bool:
+        return decoder.read_boolean()
+
+
+class IntegerReader(Reader):
+    def read(self, decoder: BinaryDecoder) -> int:
+        return decoder.read_int()
+
+
+class LongReader(Reader):
+    def read(self, decoder: BinaryDecoder) -> int:
+        return decoder.read_long()
+
+
+class FloatReader(Reader):
+    def read(self, decoder: BinaryDecoder) -> float:
+        return decoder.read_float()
+
+
+class DoubleReader(Reader):
+    def read(self, decoder: BinaryDecoder) -> float:
+        return decoder.read_double()
+
+
+class DateReader(Reader):
+    def read(self, decoder: BinaryDecoder) -> date:
+        return decoder.read_date_from_int()
+
+
+class TimeReader(Reader):
+    def read(self, decoder: BinaryDecoder) -> time:
+        return decoder.read_time_micros_from_long()
+
+
+class TimestampReader(Reader):
+    def read(self, decoder: BinaryDecoder) -> datetime:
+        return decoder.read_timestamp_micros_from_long()
+
+
+class TimestamptzReader(Reader):
+    def read(self, decoder: BinaryDecoder) -> datetime:
+        return decoder.read_timestamp_micros_from_long()
+
+
+class StringReader(Reader):
+    def read(self, decoder: BinaryDecoder) -> str:
+        return decoder.read_utf8()
+
+
+class UUIDReader(Reader):
+    def read(self, decoder: BinaryDecoder) -> UUID:
+        return UUID(decoder.read_utf8())
+
+
+@dataclass(frozen=True)
+class FixedReader(Reader):
+    length: int = field()
+
+    def read(self, decoder: BinaryDecoder) -> bytes:
+        return decoder.read(self.length)
+
+
+class BinaryReader(Reader):
+    def read(self, decoder: BinaryDecoder) -> bytes:
+        return decoder.read_bytes()
+
+
+@dataclass(frozen=True)
+class DecimalReader(Reader):
+    precision: int = field()
+    scale: int = field()
+
+    def read(self, decoder: BinaryDecoder) -> Decimal:
+        return decoder.read_decimal_from_bytes(self.precision, self.scale)
+
+
+@dataclass(frozen=True)
+class OptionReader(Reader):
+    option: Reader = field()
+
+    def read(self, decoder: BinaryDecoder) -> Any | None:
+        # For the Iceberg spec it is required to set the default value to null
+        # From https://iceberg.apache.org/spec/#avro
+        # Optional fields must always set the Avro field default value to null.
+        #
+        # This means that null has to come first:
+        # https://avro.apache.org/docs/current/spec.html
+        # type of the default value must match the first element of the union.
+        # This is enforced in the schema conversion, which happens prior
+        # to building the reader tree
+        if decoder.read_int() > 0:
+            return self.option.read(decoder)
+        return None
+
+
+@dataclass(frozen=True)
+class StructReader(Reader):
+    fields: list[Reader] = field()
+
+    def read(self, decoder: BinaryDecoder) -> AvroStruct:
+        return AvroStruct([field.read(decoder) for field in self.fields])
+
+
+@dataclass(frozen=True)
+class ListReader(Reader):
+    element: Reader
+
+    def read(self, decoder: BinaryDecoder) -> list:
+        read_items = []
+        block_count = decoder.read_long()
+        while block_count != 0:
+            if block_count < 0:
+                block_count = -block_count
+                # We ignore the block size for now
+                _ = decoder.read_long()
+            for _ in range(block_count):
+                read_items.append(self.element.read(decoder))
+            block_count = decoder.read_long()
+        return read_items
+
+
+@dataclass(frozen=True)
+class MapReader(Reader):
+    key: Reader
+    value: Reader
+
+    def read(self, decoder: BinaryDecoder) -> dict:
+        read_items = {}
+        block_count = decoder.read_long()
+        if block_count < 0:

Review Comment:
   Should this be moved into the `while` loop like it is in `ListReader`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to