yeandy commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r912078084


##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,94 @@
 
 # pytype: skip-file
 
+from __future__ import annotations
+
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
 from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import 
match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
 
 
 class RowTypeConstraint(typehints.TypeConstraint):
-  def __init__(self, fields):
-    self._fields = tuple(fields)
+  def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+    """For internal use only, no backwards comatibility guaratees.  See
+    
https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+    for guidance on creating PCollections with inferred schemas.
+
+    Note RowTypeConstraint does not currently store functions for converting
+    to/from the user type. Currently we only support a few types that satisfy

Review Comment:
   Can you clarify? I see the `from_user_type` function to convert from user 
type?



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -191,64 +197,61 @@ def typing_to_runner_api(self, type_: type) -> 
schema_pb2.FieldType:
     if isinstance(type_, schema_pb2.Schema):
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=type_))
 
-    elif match_is_named_tuple(type_):
-      if hasattr(type_, _BEAM_SCHEMA_ID):
-        schema_id = getattr(type_, _BEAM_SCHEMA_ID)
-        schema = self.schema_registry.get_schema_by_id(
-            getattr(type_, _BEAM_SCHEMA_ID))
-      else:
-        schema_id = self.schema_registry.generate_new_id()
+    if isinstance(type_, row_type.RowTypeConstraint):
+      if type_.schema_id is None:
+        schema_id = SCHEMA_REGISTRY.generate_new_id()
+        type_.set_schema_id(schema_id)
         schema = None
-        setattr(type_, _BEAM_SCHEMA_ID, schema_id)
+      else:
+        schema_id = type_.schema_id
+        schema = self.schema_registry.get_schema_by_id(schema_id)
 
       if schema is None:
-        fields = [
-            schema_pb2.Field(
-                name=name,
-                type=typing_to_runner_api(type_.__annotations__[name]))
-            for name in type_._fields
-        ]
-        schema = schema_pb2.Schema(fields=fields, id=schema_id)
-        self.schema_registry.add(type_, schema)
-
+        # Either user_type was not annotated with a schema id, or there was
+        # no schema in the registry with the id. The latter should only happen
+        # in tests.
+        # Either way, we need to generate a new schema proto.
+        schema = schema_pb2.Schema(
+            fields=[
+                schema_pb2.Field(
+                    name=name, type=self.typing_to_runner_api(field_type))
+                for (name, field_type) in type_._fields
+            ],
+            id=schema_id)
+        self.schema_registry.add(type_.user_type, schema)
       return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=schema))
-
-    elif isinstance(type_, row_type.RowTypeConstraint):
-      return schema_pb2.FieldType(
-          row_type=schema_pb2.RowType(
-              schema=schema_pb2.Schema(
-                  fields=[
-                      schema_pb2.Field(
-                          name=name, type=typing_to_runner_api(field_type))
-                      for (name, field_type) in type_._fields
-                  ],
-                  id=self.schema_registry.generate_new_id())))
+    else:
+      # See if this is coercible to a RowTypeConstraint (e.g. a NamedTuple or
+      # dataclass)
+      row_type_constraint = row_type.RowTypeConstraint.from_user_type(type_)
+      if row_type_constraint is not None:
+        return self.typing_to_runner_api(row_type_constraint)
 
     # All concrete types (other than NamedTuple sub-classes) should map to
     # a supported primitive type.
-    elif type_ in PRIMITIVE_TO_ATOMIC_TYPE:
+    if type_ in PRIMITIVE_TO_ATOMIC_TYPE:
       return schema_pb2.FieldType(atomic_type=PRIMITIVE_TO_ATOMIC_TYPE[type_])
 
     elif _match_is_exactly_mapping(type_):
-      key_type, value_type = map(typing_to_runner_api, _get_args(type_))
+      key_type, value_type = map(self.typing_to_runner_api, _get_args(type_))

Review Comment:
   Ah, thanks, I see the distinction now. I see the `schema_registry` arg in 
the second pass 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to