yeandy commented on a change in pull request #17108:
URL: https://github.com/apache/beam/pull/17108#discussion_r829190149



##########
File path: sdks/python/apache_beam/typehints/schemas.py
##########
@@ -144,161 +156,185 @@ def named_fields_to_schema(names_and_types):
           schema_pb2.Field(name=name, type=typing_to_runner_api(type))
           for (name, type) in names_and_types
       ],
-      id=str(uuid4()))
+      id=SCHEMA_REGISTRY.generate_new_id())
 
 
 def named_fields_from_schema(
     schema):  # (schema_pb2.Schema) -> typing.List[typing.Tuple[str, type]]
   return [(field.name, typing_from_runner_api(field.type))
           for field in schema.fields]
 
+def typing_to_runner_api(
+    type_: type,
+    schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY
+) -> schema_pb2.FieldType:
+  return SchemaTranslation(
+      schema_registry=schema_registry).typing_to_runner_api(type_)
+
+def typing_from_runner_api(
+    fieldtype_proto: schema_pb2.FieldType,
+    schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY
+) -> type:
+  return SchemaTranslation(
+      schema_registry=schema_registry).typing_from_runner_api(fieldtype_proto)
+
+class SchemaTranslation(object):
+  def __init__(self, schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY):
+    self.schema_registry = schema_registry
+
+  def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
+    if isinstance(type_, schema_pb2.Schema):
+      return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=type_))
+
+    elif match_is_named_tuple(type_):
+      if hasattr(type_, _BEAM_SCHEMA_ID):
+        schema_id = getattr(type_, _BEAM_SCHEMA_ID)
+        schema = self.schema_registry.get_schema_by_id(
+            getattr(type_, _BEAM_SCHEMA_ID))

Review comment:
       ```suggestion
           schema = self.schema_registry.get_schema_by_id(
               schema_id)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to