claudevdm commented on code in PR #35433:
URL: https://github.com/apache/beam/pull/35433#discussion_r2167526959
##########
sdks/python/apache_beam/coders/coders_test_common.py:
##########
@@ -606,6 +608,131 @@ def test_param_windowed_value_coder(self):
1, (window.IntervalWindow(11, 21), ),
PaneInfo(True, False, 1, 2, 3))))
+ def test_cross_process_deterministic_special_types(self):
+ """Test cross-process determinism for all special deterministic types"""
+ # pylint: disable=line-too-long
+ script = textwrap.dedent(
+ '''\
+ import pickle
+ import sys
+ import collections
+ import enum
+ import logging
+
+ from apache_beam.coders import coders
+ from apache_beam.coders import proto2_coder_test_messages_pb2 as
test_message
+ from typing import NamedTuple
+
+ try:
+ import dataclasses
+ except ImportError:
+ dataclasses = None
+
+ logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+ stream=sys.stderr,
+ force=True
+ )
+
+ # Define all the special types that encode_special_deterministic
handles
+ MyNamedTuple = collections.namedtuple('A', ['x', 'y'])
+ MyTypedNamedTuple = NamedTuple('MyTypedNamedTuple', [('f1', int),
('f2', str)])
+
+ class MyEnum(enum.Enum):
+ E1 = 5
+ E2 = enum.auto()
+ E3 = 'abc'
+
+ MyIntEnum = enum.IntEnum('MyIntEnum', 'I1 I2 I3')
+ MyIntFlag = enum.IntFlag('MyIntFlag', 'F1 F2 F3')
+ MyFlag = enum.Flag('MyFlag', 'F1 F2 F3')
+
+ if dataclasses is not None:
+ @dataclasses.dataclass(frozen=True)
+ class FrozenDataClass:
+ a: int
+ b: int
+
+ class DefinesGetAndSetState:
+ def __init__(self, value):
+ self.value = value
+
+ def __getstate__(self):
+ return self.value
+
+ def __setstate__(self, value):
+ self.value = value
+
+ def __eq__(self, other):
+ return type(other) is type(self) and other.value == self.value
+
+ # Test cases for all special deterministic types
+ # NOTE: When this script run in a subprocess the module is considered
+ # __main__. Dill cannot pickle enums in __main__ because it
+ # needs to define a way to create the type if it does not exist
+ # in the session, and reaches recursion depth limits.
+ test_cases = [
+ ("proto_message", test_message.MessageA(field1='value')),
+ ("named_tuple_simple", MyNamedTuple(1, 2)),
+ ("typed_named_tuple", MyTypedNamedTuple(1, 'a')),
+ ("named_tuple_list", [MyNamedTuple(1, 2), MyTypedNamedTuple(1,
'a')]),
+ # ("enum_single", MyEnum.E1),
Review Comment:
Yes In the NOTE section I mention dill breaks when trying to pickle enums
defined in main (which is the module when running the script in subprocess). I
will uncomment when we switch to cloudpickle which does not fail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]