TheNeuralBit commented on code in PR #22066:
URL: https://github.com/apache/beam/pull/22066#discussion_r910399845
##########
sdks/python/apache_beam/pvalue.py:
##########
@@ -159,6 +159,14 @@ def windowing(self):
self.producer.inputs)
return self._windowing
+ @property
+ def schema_proto(self):
+ from apache_beam.typehints.row_type import RowTypeConstraint
+ if not self.element_type is RowTypeConstraint:
+ return None
Review Comment:
Ah sorry this is something I was experimenting with that we don't need to
include in this PR. I've removed it.
The idea was to provide something similar to Java's `PCollection.getSchema`:
https://github.com/apache/beam/blob/de04604aee0275a1feeb216598179f445ac79445/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java#L332-L337
(this would have returned None in the "no inferred schema" case rather than
raising an error though)
##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,89 @@
# pytype: skip-file
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import
match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
class RowTypeConstraint(typehints.TypeConstraint):
- def __init__(self, fields):
- self._fields = tuple(fields)
+ def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+ """For internal use only, no backwards comatibility guaratees. See
+
https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+ for guidance on creating PCollections with inferred schemas.
+
+ Note RowTypeConstraint does not currently store functions for converting
+ to/from the user type. Currently we only support a few types that satisfy
+ some assumptions:
+
+ - **to:** We assume that the user type can be constructed with field values
+ in order.
+ - **from:** We assume that field values can be accessed from instances of
+ the type by attribute (i.e. with ``getattr(obj, field_name)``).
+
+ The RowTypeConstraint constructor should not be called directly (even
+ internally to Beam). Prefer static methods ``from_user_type`` or
+ ``from_fields``.
+
+ Parameters:
+ fields: a list of (name, type) tuples, representing the schema inferred
+ from user_type.
+ user_type: constructor for a user type (e.g. NamedTuple class) that is
+ used to represent this schema in user code.
+ """
+ # Recursively wrap row types in a RowTypeConstraint
+ self._fields = tuple((name, RowTypeConstraint.from_user_type(typ) or typ)
+ for name,
+ typ in fields)
+
+ self._user_type = user_type
+ if self._user_type is not None and hasattr(self._user_type,
+ _BEAM_SCHEMA_ID):
+ self._schema_id = getattr(self._user_type, _BEAM_SCHEMA_ID)
+ else:
+ self._schema_id = None
Review Comment:
Currently we assign IDs when mapping to a Schema proto, in
`apache_beam.typehints.schemas`, primarily just because the spec requires one:
https://github.com/apache/beam/blob/de04604aee0275a1feeb216598179f445ac79445/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/schema.proto#L40
I added a comment about this, thanks for the question.
##########
sdks/python/apache_beam/typehints/schemas_test.py:
##########
@@ -50,68 +98,26 @@ class SchemaTest(unittest.TestCase):
are cached by ID, so performing just one of them wouldn't necessarily
exercise
all code paths.
"""
- def test_typing_survives_proto_roundtrip(self):
- all_nonoptional_primitives = [
- np.int8,
- np.int16,
- np.int32,
- np.int64,
- np.float32,
- np.float64,
- bool,
- bytes,
- str,
- ]
-
- all_optional_primitives = [
- Optional[typ] for typ in all_nonoptional_primitives
- ]
-
- all_primitives = all_nonoptional_primitives + all_optional_primitives
-
- basic_array_types = [Sequence[typ] for typ in all_primitives]
-
- basic_map_types = [
- Mapping[key_type, value_type] for key_type,
- value_type in itertools.product(all_primitives, all_primitives)
- ]
-
- selected_schemas = [
- NamedTuple(
- 'AllPrimitives',
- [('field%d' % i, typ) for i, typ in enumerate(all_primitives)]),
- NamedTuple(
- 'ComplexSchema',
- [
- ('id', np.int64),
- ('name', str),
- ('optional_map', Optional[Mapping[str, Optional[np.float64]]]),
- ('optional_array', Optional[Sequence[np.float32]]),
- ('array_optional', Sequence[Optional[bool]]),
- ('timestamp', Timestamp),
- ])
- ]
+ @parameterized.expand([(typ,) for typ in
+ all_primitives + \
+ basic_array_types + \
+ basic_map_types]
+ )
+ def test_typing_survives_proto_roundtrip(self, typ):
+ self.assertEqual(
+ typ,
+ typing_from_runner_api(
+ typing_to_runner_api(typ, schema_registry=SchemaTypeRegistry()),
+ schema_registry=SchemaTypeRegistry()))
- test_cases = all_primitives + \
- basic_array_types + \
- basic_map_types
+ @parameterized.expand([(AllPrimitives, ), (ComplexSchema, )])
Review Comment:
`parameterize` requires tuples (or dictionaries) of arguments that will be
passed to the test function. Even for a single function.
##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,89 @@
# pytype: skip-file
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import
match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
class RowTypeConstraint(typehints.TypeConstraint):
- def __init__(self, fields):
- self._fields = tuple(fields)
+ def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+ """For internal use only, no backwards comatibility guaratees. See
+
https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+ for guidance on creating PCollections with inferred schemas.
+
+ Note RowTypeConstraint does not currently store functions for converting
+ to/from the user type. Currently we only support a few types that satisfy
+ some assumptions:
+
+ - **to:** We assume that the user type can be constructed with field values
+ in order.
+ - **from:** We assume that field values can be accessed from instances of
+ the type by attribute (i.e. with ``getattr(obj, field_name)``).
+
+ The RowTypeConstraint constructor should not be called directly (even
+ internally to Beam). Prefer static methods ``from_user_type`` or
+ ``from_fields``.
+
+ Parameters:
+ fields: a list of (name, type) tuples, representing the schema inferred
+ from user_type.
+ user_type: constructor for a user type (e.g. NamedTuple class) that is
+ used to represent this schema in user code.
+ """
+ # Recursively wrap row types in a RowTypeConstraint
+ self._fields = tuple((name, RowTypeConstraint.from_user_type(typ) or typ)
+ for name,
+ typ in fields)
+
+ self._user_type = user_type
+ if self._user_type is not None and hasattr(self._user_type,
+ _BEAM_SCHEMA_ID):
+ self._schema_id = getattr(self._user_type, _BEAM_SCHEMA_ID)
+ else:
+ self._schema_id = None
+
+ @staticmethod
+ def from_user_type(user_type: type) -> Optional['RowTypeConstraint']:
+ if match_is_named_tuple(user_type):
+ fields = [(name, user_type.__annotations__[name])
+ for name in user_type._fields]
+
+ return RowTypeConstraint(fields=fields, user_type=user_type)
+
+ return None
+
+ @staticmethod
+ def from_fields(fields: Sequence[Tuple[str, type]]) -> 'RowTypeConstraint':
Review Comment:
Do you mean the string `'RowTypeConstraint'` rather than an actual
`RowTypeConstraint` typehint?
See [PEP 563](https://peps.python.org/pep-0563/) for details on that, as
well as the fix for it in Python 3.7 and above.
Speaking of which, I went ahead and updated this file with `from __future__
import annotations` so we can use `RowTypeConstraint` here.
Please let me know if that's not what you were getting at.
##########
sdks/python/apache_beam/typehints/schemas_test.py:
##########
@@ -41,6 +43,52 @@
from apache_beam.typehints.schemas import typing_to_runner_api
from apache_beam.utils.timestamp import Timestamp
+all_nonoptional_primitives = [
+ np.int8,
+ np.int16,
+ np.int32,
+ np.int64,
+ np.float32,
+ np.float64,
+ bool,
+ bytes,
+ str,
+]
+
+all_optional_primitives = [Optional[typ] for typ in all_nonoptional_primitives]
+
+all_primitives = all_nonoptional_primitives + all_optional_primitives
+
+basic_array_types = [Sequence[typ] for typ in all_primitives]
+
+basic_map_types = [
+ Mapping[key_type, value_type] for key_type,
+ value_type in itertools.product(all_primitives, all_primitives)
+]
+
+
+class AllPrimitives(NamedTuple):
+ field_int8: np.int8
+ field_int16: np.int16
+ field_int32: np.int32
+ field_int64: np.int64
+ field_float32: np.float32
+ field_float64: np.float64
+ field_bool: bool
+ field_bytes: bytes
+ field_str: str
+ field_optional_bool: Optional[bool]
+ field_optional_int32: Optional[np.int32]
Review Comment:
yeah, might as well. Done.
##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -83,6 +83,7 @@
from apache_beam.utils.timestamp import Timestamp
PYTHON_ANY_URN = "beam:logical:pythonsdk_any:v1"
+PYTHON_USER_TYPE_OPTION_URN = "beam:python:user_type:v1"
Review Comment:
Whoops, this is part of #22082 and shouldn't be here, thanks! Removed.
##########
sdks/python/apache_beam/typehints/trivial_inference.py:
##########
@@ -438,8 +438,11 @@ def infer_return_type_func(f, input_types, debug=False,
depth=0):
from apache_beam.pvalue import Row
if state.stack[-pop_count].value == Row:
fields = state.stack[-1].value
- return_type = row_type.RowTypeConstraint(
- zip(fields, Const.unwrap_all(state.stack[-pop_count + 1:-1])))
+ return_type = row_type.RowTypeConstraint.from_fields(
+ list(
+ zip(
+ fields,
+ Const.unwrap_all(state.stack[-pop_count + 1:-1]))))
Review Comment:
Yeah unfortunately all of this Python bytecode inference logic is pretty
opaque. I'm only touching it here to adjust the call to `RowTypeConstraint` for
my changes (change to use static method `from_fields`, make sure argument is a
list, not an iterable).
Is there something specific I can do to improve clarity here?
##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -116,6 +118,14 @@ def get_schema_by_id(self, unique_id):
result = self.by_id.get(unique_id, None)
return result[1] if result is not None else None
+ def get_id_by_typing(self, typing):
Review Comment:
Same here, this is part of #22082. Thanks for calling these out.
##########
sdks/python/apache_beam/typehints/schemas_test.py:
##########
@@ -50,68 +98,26 @@ class SchemaTest(unittest.TestCase):
are cached by ID, so performing just one of them wouldn't necessarily
exercise
all code paths.
"""
- def test_typing_survives_proto_roundtrip(self):
- all_nonoptional_primitives = [
- np.int8,
- np.int16,
- np.int32,
- np.int64,
- np.float32,
- np.float64,
- bool,
- bytes,
- str,
- ]
-
- all_optional_primitives = [
- Optional[typ] for typ in all_nonoptional_primitives
- ]
-
- all_primitives = all_nonoptional_primitives + all_optional_primitives
-
- basic_array_types = [Sequence[typ] for typ in all_primitives]
-
- basic_map_types = [
- Mapping[key_type, value_type] for key_type,
- value_type in itertools.product(all_primitives, all_primitives)
- ]
-
- selected_schemas = [
- NamedTuple(
- 'AllPrimitives',
- [('field%d' % i, typ) for i, typ in enumerate(all_primitives)]),
- NamedTuple(
- 'ComplexSchema',
- [
- ('id', np.int64),
- ('name', str),
- ('optional_map', Optional[Mapping[str, Optional[np.float64]]]),
- ('optional_array', Optional[Sequence[np.float32]]),
- ('array_optional', Sequence[Optional[bool]]),
- ('timestamp', Timestamp),
- ])
- ]
+ @parameterized.expand([(typ,) for typ in
Review Comment:
Just to avoid the reserved word `type`. I changed this to `user_type` for
clarity.
##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -17,19 +17,89 @@
# pytype: skip-file
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+
from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import
match_is_named_tuple
+
+# Name of the attribute added to user types (existing and generated) to store
+# the corresponding schema ID
+_BEAM_SCHEMA_ID = "_beam_schema_id"
class RowTypeConstraint(typehints.TypeConstraint):
- def __init__(self, fields):
- self._fields = tuple(fields)
+ def __init__(self, fields: List[Tuple[str, type]], user_type=None):
+ """For internal use only, no backwards comatibility guaratees. See
+
https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
+ for guidance on creating PCollections with inferred schemas.
+
+ Note RowTypeConstraint does not currently store functions for converting
+ to/from the user type. Currently we only support a few types that satisfy
+ some assumptions:
+
+ - **to:** We assume that the user type can be constructed with field values
+ in order.
+ - **from:** We assume that field values can be accessed from instances of
+ the type by attribute (i.e. with ``getattr(obj, field_name)``).
+
+ The RowTypeConstraint constructor should not be called directly (even
+ internally to Beam). Prefer static methods ``from_user_type`` or
+ ``from_fields``.
+
+ Parameters:
+ fields: a list of (name, type) tuples, representing the schema inferred
+ from user_type.
+ user_type: constructor for a user type (e.g. NamedTuple class) that is
+ used to represent this schema in user code.
+ """
+ # Recursively wrap row types in a RowTypeConstraint
+ self._fields = tuple((name, RowTypeConstraint.from_user_type(typ) or typ)
+ for name,
+ typ in fields)
+
+ self._user_type = user_type
+ if self._user_type is not None and hasattr(self._user_type,
+ _BEAM_SCHEMA_ID):
+ self._schema_id = getattr(self._user_type, _BEAM_SCHEMA_ID)
+ else:
+ self._schema_id = None
Review Comment:
Thanks, that does at least allow dropping the `hasattr` call
##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -191,64 +197,61 @@ def typing_to_runner_api(self, type_: type) ->
schema_pb2.FieldType:
if isinstance(type_, schema_pb2.Schema):
return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=type_))
- elif match_is_named_tuple(type_):
- if hasattr(type_, _BEAM_SCHEMA_ID):
- schema_id = getattr(type_, _BEAM_SCHEMA_ID)
- schema = self.schema_registry.get_schema_by_id(
- getattr(type_, _BEAM_SCHEMA_ID))
- else:
- schema_id = self.schema_registry.generate_new_id()
+ if isinstance(type_, row_type.RowTypeConstraint):
+ if type_.schema_id is None:
+ schema_id = SCHEMA_REGISTRY.generate_new_id()
+ type_.set_schema_id(schema_id)
schema = None
- setattr(type_, _BEAM_SCHEMA_ID, schema_id)
+ else:
+ schema_id = type_.schema_id
+ schema = self.schema_registry.get_schema_by_id(schema_id)
if schema is None:
- fields = [
- schema_pb2.Field(
- name=name,
- type=typing_to_runner_api(type_.__annotations__[name]))
- for name in type_._fields
- ]
- schema = schema_pb2.Schema(fields=fields, id=schema_id)
- self.schema_registry.add(type_, schema)
-
+ # Either user_type was not annotated with a schema id, or there was
+ # no schema in the registry with the id. The latter should only happen
+ # in tests.
+ # Either way, we need to generate a new schema proto.
+ schema = schema_pb2.Schema(
+ fields=[
+ schema_pb2.Field(
+ name=name, type=self.typing_to_runner_api(field_type))
+ for (name, field_type) in type_._fields
+ ],
+ id=schema_id)
+ self.schema_registry.add(type_.user_type, schema)
return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=schema))
-
- elif isinstance(type_, row_type.RowTypeConstraint):
- return schema_pb2.FieldType(
- row_type=schema_pb2.RowType(
- schema=schema_pb2.Schema(
- fields=[
- schema_pb2.Field(
- name=name, type=typing_to_runner_api(field_type))
- for (name, field_type) in type_._fields
- ],
- id=self.schema_registry.generate_new_id())))
+ else:
+ # See if this is coercible to a RowTypeConstraint (e.g. a NamedTuple or
+ # dataclass)
+ row_type_constraint = row_type.RowTypeConstraint.from_user_type(type_)
+ if row_type_constraint is not None:
+ return self.typing_to_runner_api(row_type_constraint)
# All concrete types (other than NamedTuple sub-classes) should map to
# a supported primitive type.
- elif type_ in PRIMITIVE_TO_ATOMIC_TYPE:
+ if type_ in PRIMITIVE_TO_ATOMIC_TYPE:
return schema_pb2.FieldType(atomic_type=PRIMITIVE_TO_ATOMIC_TYPE[type_])
elif _match_is_exactly_mapping(type_):
- key_type, value_type = map(typing_to_runner_api, _get_args(type_))
+ key_type, value_type = map(self.typing_to_runner_api, _get_args(type_))
Review Comment:
Sure, `SchemaTranslation` just tracks the registry that is used for caching
schemas by ID. The intention is that we can specify a registry to use at the
entrypoint (the free function `typing_from_runner_api`), then drop into the
implementation which keeps track of that context.
These are just places that I missed when moving the logic over to use
`SchemaTranslation` in #17108. The intention was for all recursive calls to use
self, and only have the free functions as the input.
##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -116,6 +118,14 @@ def get_schema_by_id(self, unique_id):
result = self.by_id.get(unique_id, None)
return result[1] if result is not None else None
+ def get_id_by_typing(self, typing):
+ result = self.by_typing.get(typing, None)
Review Comment:
This is obsolete, I didn't intend to keep this code.
##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -191,64 +197,61 @@ def typing_to_runner_api(self, type_: type) ->
schema_pb2.FieldType:
if isinstance(type_, schema_pb2.Schema):
return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=type_))
- elif match_is_named_tuple(type_):
- if hasattr(type_, _BEAM_SCHEMA_ID):
- schema_id = getattr(type_, _BEAM_SCHEMA_ID)
- schema = self.schema_registry.get_schema_by_id(
- getattr(type_, _BEAM_SCHEMA_ID))
- else:
- schema_id = self.schema_registry.generate_new_id()
+ if isinstance(type_, row_type.RowTypeConstraint):
+ if type_.schema_id is None:
+ schema_id = SCHEMA_REGISTRY.generate_new_id()
+ type_.set_schema_id(schema_id)
schema = None
- setattr(type_, _BEAM_SCHEMA_ID, schema_id)
+ else:
+ schema_id = type_.schema_id
+ schema = self.schema_registry.get_schema_by_id(schema_id)
if schema is None:
- fields = [
- schema_pb2.Field(
- name=name,
- type=typing_to_runner_api(type_.__annotations__[name]))
- for name in type_._fields
- ]
- schema = schema_pb2.Schema(fields=fields, id=schema_id)
- self.schema_registry.add(type_, schema)
-
+ # Either user_type was not annotated with a schema id, or there was
+ # no schema in the registry with the id. The latter should only happen
+ # in tests.
+ # Either way, we need to generate a new schema proto.
+ schema = schema_pb2.Schema(
+ fields=[
+ schema_pb2.Field(
+ name=name, type=self.typing_to_runner_api(field_type))
+ for (name, field_type) in type_._fields
+ ],
+ id=schema_id)
+ self.schema_registry.add(type_.user_type, schema)
return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=schema))
-
- elif isinstance(type_, row_type.RowTypeConstraint):
- return schema_pb2.FieldType(
- row_type=schema_pb2.RowType(
- schema=schema_pb2.Schema(
- fields=[
- schema_pb2.Field(
- name=name, type=typing_to_runner_api(field_type))
- for (name, field_type) in type_._fields
- ],
- id=self.schema_registry.generate_new_id())))
+ else:
+ # See if this is coercible to a RowTypeConstraint (e.g. a NamedTuple or
+ # dataclass)
+ row_type_constraint = row_type.RowTypeConstraint.from_user_type(type_)
+ if row_type_constraint is not None:
+ return self.typing_to_runner_api(row_type_constraint)
Review Comment:
This pattern is pretty common for recursive schema-handling logic in both
Python and Java. See for example:
https://github.com/apache/beam/blob/07ed486d653df440b7993679bc6226e0dc4dd6dc/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java#L323-L337
I'm open to suggestions on how to refactor, but I'd also prefer to leave
them for a separate PR to avoid confusion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]