TheNeuralBit commented on code in PR #22947:
URL: https://github.com/apache/beam/pull/22947#discussion_r960088478


##########
sdks/python/apache_beam/typehints/arrow_type_compatibility.py:
##########
@@ -0,0 +1,271 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for converting between Beam and Arrow schemas.
+
+For internal use only, no backward compatibility guarantees.
+"""
+
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
+import pyarrow as pa
+
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.batch import BatchConverter
+from apache_beam.typehints.row_type import RowTypeConstraint
+from apache_beam.typehints.schemas import typing_from_runner_api
+from apache_beam.typehints.schemas import typing_to_runner_api
+from apache_beam.utils import proto_utils
+
+__all__ = []
+
+BEAM_SCHEMA_ID_KEY = b'beam:schema_id'
+BEAM_OPTION_KEY_PREFIX = b'beam:option:'
+
+
+def _hydrate_beam_option(encoded_option: bytes) -> schema_pb2.Option:
+  return proto_utils.parse_Bytes(encoded_option, schema_pb2.Option)
+
+
+def beam_schema_from_arrow_schema(arrow_schema: pa.Schema) -> 
schema_pb2.Schema:
+  if arrow_schema.metadata:
+    schema_id = arrow_schema.metadata.get(BEAM_SCHEMA_ID_KEY, None)
+    schema_options = [
+        _hydrate_beam_option(value) for key,
+        value in arrow_schema.metadata.items()
+        if key.startswith(BEAM_OPTION_KEY_PREFIX)
+    ]
+  else:
+    schema_id = None
+    schema_options = []
+
+  return schema_pb2.Schema(
+      fields=[
+          _beam_field_from_arrow_field(arrow_schema.field(i))
+          for i in range(len(arrow_schema.types))
+      ],
+      options=schema_options,
+      id=schema_id)
+
+
+def _beam_field_from_arrow_field(arrow_field: pa.Field) -> schema_pb2.Field:
+  beam_fieldtype = _beam_fieldtype_from_arrow_field(arrow_field)
+
+  if arrow_field.metadata:
+    field_options = [
+        _hydrate_beam_option(value) for key,
+        value in arrow_field.metadata.items()
+        if key.startswith(BEAM_OPTION_KEY_PREFIX)
+    ]
+  else:
+    field_options = None
+
+  return schema_pb2.Field(
+      name=arrow_field.name,
+      type=beam_fieldtype,
+      options=field_options,
+  )
+
+
+def _beam_fieldtype_from_arrow_field(
+    arrow_field: pa.Field) -> schema_pb2.FieldType:
+  beam_fieldtype = _beam_fieldtype_from_arrow_type(arrow_field.type)
+  beam_fieldtype.nullable = arrow_field.nullable
+
+  return beam_fieldtype
+
+
+def _beam_fieldtype_from_arrow_type(
+    arrow_type: pa.DataType) -> schema_pb2.FieldType:
+  if arrow_type in PYARROW_TO_ATOMIC_TYPE:
+    return schema_pb2.FieldType(atomic_type=PYARROW_TO_ATOMIC_TYPE[arrow_type])
+  elif isinstance(arrow_type, pa.ListType):
+    return schema_pb2.FieldType(
+        array_type=schema_pb2.ArrayType(
+            element_type=_beam_fieldtype_from_arrow_field(
+                arrow_type.value_field)))
+  elif isinstance(arrow_type, pa.MapType):
+    return schema_pb2.FieldType(
+        map_type=schema_pb2.MapType(
+            key_type=_beam_fieldtype_from_arrow_field(arrow_type.key_field),
+            
value_type=_beam_fieldtype_from_arrow_field(arrow_type.item_field)))
+  elif isinstance(arrow_type, pa.StructType):
+    # TODO
+    pass
+  else:
+    raise ValueError(f"Unrecognized arrow type: {arrow_type!r}")
+
+
+def _option_as_arrow_metadata(
+    beam_option: schema_pb2.Option) -> Tuple[bytes, bytes]:
+  return (
+      BEAM_OPTION_KEY_PREFIX + beam_option.name.encode('UTF-8'),
+      beam_option.SerializeToString())
+
+
+def arrow_schema_from_beam_schema(beam_schema: schema_pb2.Schema) -> pa.Schema:
+  return pa.schema(
+      [_arrow_field_from_beam_field(field) for field in beam_schema.fields],
+      {
+          BEAM_SCHEMA_ID_KEY: beam_schema.id,
+          **dict(
+              _option_as_arrow_metadata(option) for option in 
beam_schema.options)  # pylint: disable=line-too-long

Review Comment:
   Noting here that pylint and yapf disagree on this line. I should file an 
issue tracking this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to