Fokko commented on code in PR #6997:
URL: https://github.com/apache/iceberg/pull/6997#discussion_r1132896209
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -484,6 +493,204 @@ def expression_to_pyarrow(expr: BooleanExpression) ->
pc.Expression:
return boolean_expression_visit(expr, _ConvertToArrowExpression())
+def pyarrow_to_schema(schema: pa.Schema) -> Schema:
+ visitor = _ConvertToIceberg()
+ struct_results: List[Optional[IcebergType]] = []
+ for i, _ in enumerate(schema.names):
+ field = schema.field(i)
+ visitor.before_field(field)
+ struct_result = visit_pyarrow(field.type, visitor)
+ visitor.after_field(field)
+ struct_results.append(struct_result)
+ return visitor.schema(schema, struct_results)
+
+
+@singledispatch
+def visit_pyarrow(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> T:
+ """A generic function for applying a pyarrow schema visitor to any point
within a schema
+
+ The function traverses the schema in post-order fashion
+
+ Args:
+ obj(Schema | pa.DataType): An instance of a Schema or an IcebergType
+ visitor (PyArrowSchemaVisitor[T]): An instance of an implementation of
the generic PyarrowSchemaVisitor base class
+
+ Raises:
+ NotImplementedError: If attempting to visit an unrecognized object type
+ """
+ raise NotImplementedError("Cannot visit non-type: %s" % obj)
+
+
+@visit_pyarrow.register(pa.StructType)
+def _(obj: pa.StructType, visitor: PyArrowSchemaVisitor[T]) -> T:
+ struct_results: List[Optional[T]] = []
+ for field in obj:
+ visitor.before_field(field)
+ struct_result = visit_pyarrow(field.type, visitor)
+ visitor.after_field(field)
+ struct_results.append(struct_result)
+
+ return visitor.struct(obj, struct_results)
+
+
+@visit_pyarrow.register(pa.ListType)
+def _(obj: pa.ListType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
+ visitor.before_list_element(obj.value_field)
+ list_result = visit_pyarrow(obj.value_field.type, visitor)
+ visitor.after_list_element(obj.value_field)
+ return visitor.list(obj, list_result)
+
+
+@visit_pyarrow.register(pa.MapType)
+def _(obj: pa.MapType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
+ visitor.before_map_key(obj.key_field)
+ key_result = visit_pyarrow(obj.key_field.type, visitor)
+ visitor.after_map_key(obj.key_field)
+ visitor.before_map_value(obj.item_field)
+ value_result = visit_pyarrow(obj.item_field.type, visitor)
+ visitor.after_map_value(obj.item_field)
+ return visitor.map(obj, key_result, value_result)
+
+
+@visit_pyarrow.register(pa.DataType)
+def _(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> T:
+ if pa.types.is_nested(obj):
+ raise TypeError(f"Expected primitive type, got {type(obj)}")
+ return visitor.primitive(obj)
+
+
+class PyArrowSchemaVisitor(Generic[T], ABC):
+ def before_field(self, field: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a field."""
+
+ def after_field(self, field: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a field."""
+
+ def before_list_element(self, element: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a list element."""
+
+ def after_list_element(self, element: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a list element."""
+
+ def before_map_key(self, key: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a map key."""
+
+ def after_map_key(self, key: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a map key."""
+
+ def before_map_value(self, value: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a map value."""
+
+ def after_map_value(self, value: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a map value."""
+
+ @abstractmethod
+ def schema(self, schema: pa.Schema, field_results: List[Optional[T]]) ->
Schema:
+ """visit a schema"""
+
+ @abstractmethod
+ def struct(self, struct: pa.StructType, field_results: List[Optional[T]])
-> T:
+ """visit a struct"""
+
+ @abstractmethod
+ def list(self, list_type: pa.ListType, element_result: Optional[T]) ->
Optional[T]:
+ """visit a list"""
+
+ @abstractmethod
+ def map(self, map_type: pa.MapType, key_result: Optional[T], value_result:
Optional[T]) -> Optional[T]:
+ """visit a map"""
+
+ @abstractmethod
+ def primitive(self, primitive: pa.DataType) -> T:
+ """visit a primitive type"""
+
+
+def _get_field_id_and_doc(field: pa.Field) -> Tuple[Optional[int],
Optional[str]]:
+ field_metadata = {k.decode(): v.decode() for k, v in
field.metadata.items()}
+ field_id = None
+ doc = field_metadata.get("PARQUET:doc", field_metadata.get("doc"))
+
+ if field_id_str := field_metadata.get("PARQUET:field_id",
field_metadata.get("field_id")):
Review Comment:
Nit: I would move `"PARQUET:field_id"` into a constant
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -484,6 +493,204 @@ def expression_to_pyarrow(expr: BooleanExpression) ->
pc.Expression:
return boolean_expression_visit(expr, _ConvertToArrowExpression())
+def pyarrow_to_schema(schema: pa.Schema) -> Schema:
+ visitor = _ConvertToIceberg()
+ struct_results: List[Optional[IcebergType]] = []
+ for i, _ in enumerate(schema.names):
+ field = schema.field(i)
+ visitor.before_field(field)
+ struct_result = visit_pyarrow(field.type, visitor)
+ visitor.after_field(field)
+ struct_results.append(struct_result)
+ return visitor.schema(schema, struct_results)
+
+
+@singledispatch
+def visit_pyarrow(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> T:
+ """A generic function for applying a pyarrow schema visitor to any point
within a schema
+
+ The function traverses the schema in post-order fashion
+
+ Args:
+ obj(Schema | pa.DataType): An instance of a Schema or an IcebergType
+ visitor (PyArrowSchemaVisitor[T]): An instance of an implementation of
the generic PyarrowSchemaVisitor base class
+
+ Raises:
+ NotImplementedError: If attempting to visit an unrecognized object type
+ """
+ raise NotImplementedError("Cannot visit non-type: %s" % obj)
+
+
+@visit_pyarrow.register(pa.StructType)
+def _(obj: pa.StructType, visitor: PyArrowSchemaVisitor[T]) -> T:
+ struct_results: List[Optional[T]] = []
+ for field in obj:
+ visitor.before_field(field)
+ struct_result = visit_pyarrow(field.type, visitor)
+ visitor.after_field(field)
+ struct_results.append(struct_result)
+
+ return visitor.struct(obj, struct_results)
+
+
+@visit_pyarrow.register(pa.ListType)
+def _(obj: pa.ListType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
+ visitor.before_list_element(obj.value_field)
+ list_result = visit_pyarrow(obj.value_field.type, visitor)
+ visitor.after_list_element(obj.value_field)
+ return visitor.list(obj, list_result)
+
+
+@visit_pyarrow.register(pa.MapType)
+def _(obj: pa.MapType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
+ visitor.before_map_key(obj.key_field)
+ key_result = visit_pyarrow(obj.key_field.type, visitor)
+ visitor.after_map_key(obj.key_field)
+ visitor.before_map_value(obj.item_field)
+ value_result = visit_pyarrow(obj.item_field.type, visitor)
+ visitor.after_map_value(obj.item_field)
+ return visitor.map(obj, key_result, value_result)
+
+
+@visit_pyarrow.register(pa.DataType)
+def _(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> T:
+ if pa.types.is_nested(obj):
+ raise TypeError(f"Expected primitive type, got {type(obj)}")
+ return visitor.primitive(obj)
+
+
+class PyArrowSchemaVisitor(Generic[T], ABC):
+ def before_field(self, field: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a field."""
+
+ def after_field(self, field: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a field."""
+
+ def before_list_element(self, element: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a list element."""
+
+ def after_list_element(self, element: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a list element."""
+
+ def before_map_key(self, key: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a map key."""
+
+ def after_map_key(self, key: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a map key."""
+
+ def before_map_value(self, value: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a map value."""
+
+ def after_map_value(self, value: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a map value."""
+
+ @abstractmethod
+ def schema(self, schema: pa.Schema, field_results: List[Optional[T]]) ->
Schema:
+ """visit a schema"""
+
+ @abstractmethod
+ def struct(self, struct: pa.StructType, field_results: List[Optional[T]])
-> T:
+ """visit a struct"""
+
+ @abstractmethod
+ def list(self, list_type: pa.ListType, element_result: Optional[T]) ->
Optional[T]:
+ """visit a list"""
+
+ @abstractmethod
+ def map(self, map_type: pa.MapType, key_result: Optional[T], value_result:
Optional[T]) -> Optional[T]:
+ """visit a map"""
+
+ @abstractmethod
+ def primitive(self, primitive: pa.DataType) -> T:
+ """visit a primitive type"""
+
+
+def _get_field_id_and_doc(field: pa.Field) -> Tuple[Optional[int],
Optional[str]]:
+ field_metadata = {k.decode(): v.decode() for k, v in
field.metadata.items()}
+ field_id = None
+ doc = field_metadata.get("PARQUET:doc", field_metadata.get("doc"))
+
+ if field_id_str := field_metadata.get("PARQUET:field_id",
field_metadata.get("field_id")):
+ field_id = int(field_id_str)
+
+ return field_id, doc
+
+
+class _ConvertToIceberg(PyArrowSchemaVisitor[IcebergType], ABC):
+ def schema(self, schema: pa.Schema, field_results:
List[Optional[IcebergType]]) -> Schema:
+ fields = []
+ for i, _ in enumerate(schema.names):
Review Comment:
```suggestion
for i, field in enumerate(schema.fields):
```
And then we can lose the lookup on the next line.
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -576,7 +780,8 @@ def project_table(
for table in pool.starmap(
func=_file_to_table,
iterable=[(fs, task, bound_row_filter, projected_schema,
projected_field_ids, case_sensitive) for task in tasks],
- chunksize=None, # we could use this to control how to
materialize the generator of tasks (we should also make the expression above
lazy)
+ chunksize=None,
Review Comment:
Unrelated change
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -484,6 +493,204 @@ def expression_to_pyarrow(expr: BooleanExpression) ->
pc.Expression:
return boolean_expression_visit(expr, _ConvertToArrowExpression())
+def pyarrow_to_schema(schema: pa.Schema) -> Schema:
+ visitor = _ConvertToIceberg()
+ struct_results: List[Optional[IcebergType]] = []
+ for i, _ in enumerate(schema.names):
+ field = schema.field(i)
+ visitor.before_field(field)
+ struct_result = visit_pyarrow(field.type, visitor)
+ visitor.after_field(field)
+ struct_results.append(struct_result)
+ return visitor.schema(schema, struct_results)
+
+
+@singledispatch
+def visit_pyarrow(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> T:
+ """A generic function for applying a pyarrow schema visitor to any point
within a schema
+
+ The function traverses the schema in post-order fashion
+
+ Args:
+ obj(Schema | pa.DataType): An instance of a Schema or an IcebergType
+ visitor (PyArrowSchemaVisitor[T]): An instance of an implementation of
the generic PyarrowSchemaVisitor base class
+
+ Raises:
+ NotImplementedError: If attempting to visit an unrecognized object type
+ """
+ raise NotImplementedError("Cannot visit non-type: %s" % obj)
+
+
+@visit_pyarrow.register(pa.StructType)
+def _(obj: pa.StructType, visitor: PyArrowSchemaVisitor[T]) -> T:
+ struct_results: List[Optional[T]] = []
+ for field in obj:
+ visitor.before_field(field)
+ struct_result = visit_pyarrow(field.type, visitor)
+ visitor.after_field(field)
+ struct_results.append(struct_result)
+
+ return visitor.struct(obj, struct_results)
+
+
+@visit_pyarrow.register(pa.ListType)
+def _(obj: pa.ListType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
+ visitor.before_list_element(obj.value_field)
+ list_result = visit_pyarrow(obj.value_field.type, visitor)
+ visitor.after_list_element(obj.value_field)
+ return visitor.list(obj, list_result)
+
+
+@visit_pyarrow.register(pa.MapType)
+def _(obj: pa.MapType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
+ visitor.before_map_key(obj.key_field)
+ key_result = visit_pyarrow(obj.key_field.type, visitor)
+ visitor.after_map_key(obj.key_field)
+ visitor.before_map_value(obj.item_field)
+ value_result = visit_pyarrow(obj.item_field.type, visitor)
+ visitor.after_map_value(obj.item_field)
+ return visitor.map(obj, key_result, value_result)
+
+
+@visit_pyarrow.register(pa.DataType)
+def _(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> T:
+ if pa.types.is_nested(obj):
+ raise TypeError(f"Expected primitive type, got {type(obj)}")
+ return visitor.primitive(obj)
+
+
+class PyArrowSchemaVisitor(Generic[T], ABC):
+ def before_field(self, field: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a field."""
+
+ def after_field(self, field: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a field."""
+
+ def before_list_element(self, element: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a list element."""
+
+ def after_list_element(self, element: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a list element."""
+
+ def before_map_key(self, key: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a map key."""
+
+ def after_map_key(self, key: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a map key."""
+
+ def before_map_value(self, value: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a map value."""
+
+ def after_map_value(self, value: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a map value."""
+
+ @abstractmethod
+ def schema(self, schema: pa.Schema, field_results: List[Optional[T]]) ->
Schema:
+ """visit a schema"""
+
+ @abstractmethod
+ def struct(self, struct: pa.StructType, field_results: List[Optional[T]])
-> T:
+ """visit a struct"""
+
+ @abstractmethod
+ def list(self, list_type: pa.ListType, element_result: Optional[T]) ->
Optional[T]:
+ """visit a list"""
+
+ @abstractmethod
+ def map(self, map_type: pa.MapType, key_result: Optional[T], value_result:
Optional[T]) -> Optional[T]:
+ """visit a map"""
+
+ @abstractmethod
+ def primitive(self, primitive: pa.DataType) -> T:
+ """visit a primitive type"""
+
+
+def _get_field_id_and_doc(field: pa.Field) -> Tuple[Optional[int],
Optional[str]]:
+ field_metadata = {k.decode(): v.decode() for k, v in
field.metadata.items()}
+ field_id = None
+ doc = field_metadata.get("PARQUET:doc", field_metadata.get("doc"))
+
+ if field_id_str := field_metadata.get("PARQUET:field_id",
field_metadata.get("field_id")):
+ field_id = int(field_id_str)
+
+ return field_id, doc
+
+
+class _ConvertToIceberg(PyArrowSchemaVisitor[IcebergType], ABC):
+ def schema(self, schema: pa.Schema, field_results:
List[Optional[IcebergType]]) -> Schema:
+ fields = []
+ for i, _ in enumerate(schema.names):
+ field = schema.field(i)
+ field_id, field_doc = _get_field_id_and_doc(field)
+ field_type = field_results[i]
+ if field_type is not None and field_id is not None:
+ fields.append(NestedField(field_id, field.name, field_type,
required=not field.nullable, doc=field_doc))
+ return Schema(*fields)
+
+ def struct(self, struct: pa.StructType, field_results:
List[Optional[IcebergType]]) -> IcebergType:
+ fields = []
+ for i in range(struct.num_fields):
Review Comment:
Checked locally, and the struct itself is iterable:
```suggestion
for i, field in enumerate(struct):
```
##########
python/tests/io/test_pyarrow.py:
##########
@@ -18,73 +18,19 @@
import os
import tempfile
-from typing import Any, List, Optional
Review Comment:
Oeeeh, thanks for splitting this. It was getting too big indeed. A lot of
rebasing for me ahead
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -484,6 +493,204 @@ def expression_to_pyarrow(expr: BooleanExpression) ->
pc.Expression:
return boolean_expression_visit(expr, _ConvertToArrowExpression())
+def pyarrow_to_schema(schema: pa.Schema) -> Schema:
+ visitor = _ConvertToIceberg()
+ struct_results: List[Optional[IcebergType]] = []
+ for i, _ in enumerate(schema.names):
Review Comment:
```suggestion
for i, field in enumerate(schema.fields):
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]