rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r909057161


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    List,
+    Optional,
+    Tuple,
+    Union,
+)
+
+from iceberg.avro.reader import (
+    CastReader,
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    DecimalType,
+    DoubleType,
+    FloatType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(file_schema: Union[Schema, IcebergType], read_schema: 
Union[Schema, IcebergType]) -> Reader:
+    """This resolves the file and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         file_schema (Schema | IcebergType): The schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which 
is equal, subset or superset of the file schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object 
type
+    """
+    raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+
+
[email protected](Schema)
+def _(file_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(file_schema.as_struct(), read_schema.as_struct())
+
+
[email protected](StructType)
+def _(file_struct: StructType, read_struct: IcebergType) -> Reader:
+    """Iterates over the file schema, and checks if the field is in the read 
schema"""
+
+    if not isinstance(read_struct, StructType):
+        raise ResolveException(f"File/read schema are not aligned for 
{file_struct}, got {read_struct}")
+
+    results: List[Tuple[Optional[int], Reader]] = []
+    read_fields = {field.field_id: (pos, field) for pos, field in 
enumerate(read_struct.fields)}
+
+    for file_field in file_struct.fields:
+        if file_field.field_id in read_fields:
+            read_pos, read_field = read_fields[file_field.field_id]
+            result_reader = resolve(file_field.field_type, 
read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(file_field.field_type, ConstructReader())
+        result_reader = result_reader if file_field.required else 
OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    file_fields = {field.field_id: field for field in file_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.field_id not in file_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is non-optional, and not 
part of the file schema")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
[email protected](ListType)
+def _(file_list: ListType, read_list: IcebergType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"File/read schema are not aligned for 
{file_list}, got {read_list}")
+    element_reader = resolve(file_list.element_type, read_list.element_type)
+    return ListReader(element_reader)
+
+
[email protected](MapType)
+def _(file_map: MapType, read_map: IcebergType) -> Reader:
+    if not isinstance(read_map, MapType):
+        raise ResolveException(f"File/read schema are not aligned for 
{file_map}, got {read_map}")
+    key_reader = resolve(file_map.key_type, read_map.key_type)
+    value_reader = resolve(file_map.value_type, read_map.value_type)
+
+    return MapReader(key_reader, value_reader)
+
+
[email protected](PrimitiveType)
+def _(file_type: PrimitiveType, read_type: IcebergType) -> Reader:
+    """Converting the primitive type into an actual reader that will decode 
the physical data"""
+    if not isinstance(read_type, PrimitiveType):
+        raise ResolveException(f"Cannot promote {file_type} to {read_type}")
+
+    # In the case of a promotion, we want to check if it is valid
+    if file_type != read_type:
+        return promote(file_type, read_type)
+    return primitive_reader(read_type)
+
+
+@singledispatch
+def promote(file_type: IcebergType, read_type: IcebergType) -> Reader:
+    """Promotes reading a file type to a read type
+
+    Args:
+        file_type (IcebergType): The type of the Avro file
+        read_type (IcebergType): The requested read type
+
+    Raises:
+        ResolveException: If attempting to resolve an unrecognized object type
+    """
+    raise ResolveException(f"Cannot promote {file_type} to {read_type}")
+
+
[email protected](IntegerType)
+def _(file_type: IntegerType, read_type: IcebergType) -> Reader:
+    if isinstance(read_type, LongType):
+        # Ints/Longs are binary compatible in Avro, so this is okay
+        return primitive_reader(read_type)
+    elif type(read_type) in {FloatType, DoubleType}:
+        # We should just read the int, and convert it to a float
+        return CastReader(primitive_reader(file_type), float)
+    else:
+        raise ResolveException(f"Cannot promote an int to {read_type}")
+
+
[email protected](LongType)
+def _(file_type: LongType, read_type: IcebergType) -> Reader:
+    if type(read_type) in {FloatType, DoubleType}:

Review Comment:
   These aren't allowed promotions in Iceberg, so I think we should remove them.



##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    List,
+    Optional,
+    Tuple,
+    Union,
+)
+
+from iceberg.avro.reader import (
+    CastReader,
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    DecimalType,
+    DoubleType,
+    FloatType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(file_schema: Union[Schema, IcebergType], read_schema: 
Union[Schema, IcebergType]) -> Reader:
+    """This resolves the file and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         file_schema (Schema | IcebergType): The schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which 
is equal, subset or superset of the file schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object 
type
+    """
+    raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+
+
[email protected](Schema)
+def _(file_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(file_schema.as_struct(), read_schema.as_struct())
+
+
[email protected](StructType)
+def _(file_struct: StructType, read_struct: IcebergType) -> Reader:
+    """Iterates over the file schema, and checks if the field is in the read 
schema"""
+
+    if not isinstance(read_struct, StructType):
+        raise ResolveException(f"File/read schema are not aligned for 
{file_struct}, got {read_struct}")
+
+    results: List[Tuple[Optional[int], Reader]] = []
+    read_fields = {field.field_id: (pos, field) for pos, field in 
enumerate(read_struct.fields)}
+
+    for file_field in file_struct.fields:
+        if file_field.field_id in read_fields:
+            read_pos, read_field = read_fields[file_field.field_id]
+            result_reader = resolve(file_field.field_type, 
read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(file_field.field_type, ConstructReader())
+        result_reader = result_reader if file_field.required else 
OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    file_fields = {field.field_id: field for field in file_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.field_id not in file_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is non-optional, and not 
part of the file schema")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
[email protected](ListType)
+def _(file_list: ListType, read_list: IcebergType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"File/read schema are not aligned for 
{file_list}, got {read_list}")
+    element_reader = resolve(file_list.element_type, read_list.element_type)
+    return ListReader(element_reader)
+
+
[email protected](MapType)
+def _(file_map: MapType, read_map: IcebergType) -> Reader:
+    if not isinstance(read_map, MapType):
+        raise ResolveException(f"File/read schema are not aligned for 
{file_map}, got {read_map}")
+    key_reader = resolve(file_map.key_type, read_map.key_type)
+    value_reader = resolve(file_map.value_type, read_map.value_type)
+
+    return MapReader(key_reader, value_reader)
+
+
[email protected](PrimitiveType)
+def _(file_type: PrimitiveType, read_type: IcebergType) -> Reader:
+    """Converting the primitive type into an actual reader that will decode 
the physical data"""
+    if not isinstance(read_type, PrimitiveType):
+        raise ResolveException(f"Cannot promote {file_type} to {read_type}")
+
+    # In the case of a promotion, we want to check if it is valid
+    if file_type != read_type:
+        return promote(file_type, read_type)
+    return primitive_reader(read_type)
+
+
+@singledispatch
+def promote(file_type: IcebergType, read_type: IcebergType) -> Reader:
+    """Promotes reading a file type to a read type
+
+    Args:
+        file_type (IcebergType): The type of the Avro file
+        read_type (IcebergType): The requested read type
+
+    Raises:
+        ResolveException: If attempting to resolve an unrecognized object type
+    """
+    raise ResolveException(f"Cannot promote {file_type} to {read_type}")
+
+
[email protected](IntegerType)
+def _(file_type: IntegerType, read_type: IcebergType) -> Reader:
+    if isinstance(read_type, LongType):
+        # Ints/Longs are binary compatible in Avro, so this is okay
+        return primitive_reader(read_type)
+    elif type(read_type) in {FloatType, DoubleType}:

Review Comment:
   These aren't allowed in Iceberg.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to