yeandy commented on a change in pull request #17108:
URL: https://github.com/apache/beam/pull/17108#discussion_r829209496
##########
File path: sdks/python/apache_beam/typehints/schemas.py
##########
@@ -90,6 +89,19 @@ def __init__(self):
self.by_id = {}
self.by_typing = {}
+ def generate_new_id(self):
+ # Import uuid locally to guarantee we don't actually generate a uuid
+ # elsewhere in this file.
+ from uuid import uuid4
+ for _ in range(100):
Review comment:
I supposed if 100 has lots of collision issues, we can easily bump it up.
##########
File path: sdks/python/apache_beam/typehints/schemas_test.py
##########
@@ -91,12 +93,34 @@ def test_typing_survives_proto_roundtrip(self):
test_cases = all_primitives + \
basic_array_types + \
- basic_map_types + \
- selected_schemas
+ basic_map_types
for test_case in test_cases:
self.assertEqual(
- test_case, typing_from_runner_api(typing_to_runner_api(test_case)))
+ test_case, typing_from_runner_api(
+ typing_to_runner_api(test_case,
+ schema_registry=SchemaTypeRegistry()),
+ schema_registry=SchemaTypeRegistry()
+ ))
+
+ for test_case in selected_schemas:
+ self.assert_namedtuple_equivalent(
+ test_case, typing_from_runner_api(
+ typing_to_runner_api(test_case,
+ schema_registry=SchemaTypeRegistry()),
+ schema_registry=SchemaTypeRegistry()
+ ))
+
+
+ def assert_namedtuple_equivalent(self, actual, expected):
+ # We can't check for equality between types
+ self.assertTrue(match_is_named_tuple(expected))
+ self.assertTrue(match_is_named_tuple(actual))
Review comment:
Can you clarify why we can't check for equality for `namedtuple` types?
##########
File path: sdks/python/apache_beam/typehints/schemas_test.py
##########
@@ -91,12 +93,34 @@ def test_typing_survives_proto_roundtrip(self):
test_cases = all_primitives + \
basic_array_types + \
- basic_map_types + \
- selected_schemas
+ basic_map_types
for test_case in test_cases:
self.assertEqual(
- test_case, typing_from_runner_api(typing_to_runner_api(test_case)))
+ test_case, typing_from_runner_api(
+ typing_to_runner_api(test_case,
+ schema_registry=SchemaTypeRegistry()),
+ schema_registry=SchemaTypeRegistry()
+ ))
+
+ for test_case in selected_schemas:
Review comment:
Do we want to break out `selected_schemas` for
`test_proto_survives_typing_roundtrip` too?
##########
File path: sdks/python/apache_beam/typehints/schemas.py
##########
@@ -144,161 +156,185 @@ def named_fields_to_schema(names_and_types):
schema_pb2.Field(name=name, type=typing_to_runner_api(type))
for (name, type) in names_and_types
],
- id=str(uuid4()))
+ id=SCHEMA_REGISTRY.generate_new_id())
def named_fields_from_schema(
schema): # (schema_pb2.Schema) -> typing.List[typing.Tuple[str, type]]
return [(field.name, typing_from_runner_api(field.type))
for field in schema.fields]
+def typing_to_runner_api(
+ type_: type,
+ schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY
+) -> schema_pb2.FieldType:
+ return SchemaTranslation(
+ schema_registry=schema_registry).typing_to_runner_api(type_)
+
+def typing_from_runner_api(
+ fieldtype_proto: schema_pb2.FieldType,
+ schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY
+) -> type:
+ return SchemaTranslation(
+ schema_registry=schema_registry).typing_from_runner_api(fieldtype_proto)
+
+class SchemaTranslation(object):
+ def __init__(self, schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY):
+ self.schema_registry = schema_registry
+
+ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
+ if isinstance(type_, schema_pb2.Schema):
+ return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=type_))
+
+ elif match_is_named_tuple(type_):
+ if hasattr(type_, _BEAM_SCHEMA_ID):
+ schema_id = getattr(type_, _BEAM_SCHEMA_ID)
+ schema = self.schema_registry.get_schema_by_id(
+ getattr(type_, _BEAM_SCHEMA_ID))
Review comment:
```suggestion
schema = self.schema_registry.get_schema_by_id(
schema_id)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]