yeandy commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r910261330
##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -297,8 +300,12 @@ def typing_from_runner_api(
self.typing_from_runner_api(fieldtype_proto.map_type.value_type)]
elif type_info == "row_type":
schema = fieldtype_proto.row_type.schema
+ # First look for user type in the registray
Review Comment:
nit:
```suggestion
# First look for user type in the registry
```
##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -83,6 +83,7 @@
from apache_beam.utils.timestamp import Timestamp
PYTHON_ANY_URN = "beam:logical:pythonsdk_any:v1"
+PYTHON_USER_TYPE_OPTION_URN = "beam:python:user_type:v1"
Review Comment:
I may have missed this, but where is this used?
##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -116,6 +118,14 @@ def get_schema_by_id(self, unique_id):
result = self.by_id.get(unique_id, None)
return result[1] if result is not None else None
+ def get_id_by_typing(self, typing):
Review Comment:
Is this intended to be used? I didn't see any references
##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -191,64 +197,61 @@ def typing_to_runner_api(self, type_: type) ->
schema_pb2.FieldType:
if isinstance(type_, schema_pb2.Schema):
return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=type_))
- elif match_is_named_tuple(type_):
- if hasattr(type_, _BEAM_SCHEMA_ID):
- schema_id = getattr(type_, _BEAM_SCHEMA_ID)
- schema = self.schema_registry.get_schema_by_id(
- getattr(type_, _BEAM_SCHEMA_ID))
- else:
- schema_id = self.schema_registry.generate_new_id()
+ if isinstance(type_, row_type.RowTypeConstraint):
+ if type_.schema_id is None:
+ schema_id = SCHEMA_REGISTRY.generate_new_id()
+ type_.set_schema_id(schema_id)
schema = None
- setattr(type_, _BEAM_SCHEMA_ID, schema_id)
+ else:
+ schema_id = type_.schema_id
+ schema = self.schema_registry.get_schema_by_id(schema_id)
if schema is None:
- fields = [
- schema_pb2.Field(
- name=name,
- type=typing_to_runner_api(type_.__annotations__[name]))
- for name in type_._fields
- ]
- schema = schema_pb2.Schema(fields=fields, id=schema_id)
- self.schema_registry.add(type_, schema)
-
+ # Either user_type was not annotated with a schema id, or there was
+ # no schema in the registry with the id. The latter should only happen
+ # in tests.
+ # Either way, we need to generate a new schema proto.
+ schema = schema_pb2.Schema(
+ fields=[
+ schema_pb2.Field(
+ name=name, type=self.typing_to_runner_api(field_type))
+ for (name, field_type) in type_._fields
+ ],
+ id=schema_id)
+ self.schema_registry.add(type_.user_type, schema)
return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=schema))
-
- elif isinstance(type_, row_type.RowTypeConstraint):
- return schema_pb2.FieldType(
- row_type=schema_pb2.RowType(
- schema=schema_pb2.Schema(
- fields=[
- schema_pb2.Field(
- name=name, type=typing_to_runner_api(field_type))
- for (name, field_type) in type_._fields
- ],
- id=self.schema_registry.generate_new_id())))
+ else:
+ # See if this is coercible to a RowTypeConstraint (e.g. a NamedTuple or
+ # dataclass)
+ row_type_constraint = row_type.RowTypeConstraint.from_user_type(type_)
+ if row_type_constraint is not None:
+ return self.typing_to_runner_api(row_type_constraint)
# All concrete types (other than NamedTuple sub-classes) should map to
# a supported primitive type.
- elif type_ in PRIMITIVE_TO_ATOMIC_TYPE:
+ if type_ in PRIMITIVE_TO_ATOMIC_TYPE:
return schema_pb2.FieldType(atomic_type=PRIMITIVE_TO_ATOMIC_TYPE[type_])
elif _match_is_exactly_mapping(type_):
- key_type, value_type = map(typing_to_runner_api, _get_args(type_))
+ key_type, value_type = map(self.typing_to_runner_api, _get_args(type_))
Review Comment:
Can you clarify the difference here by using the `self.typing_to_runner_api`
vs the `typing_to_runner_api` outside of `SchemaTranslation`? similarly with
`typing_from_runner_api`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]