dpcollins-google commented on a change in pull request #15817:
URL: https://github.com/apache/beam/pull/15817#discussion_r740492418
##########
File path: sdks/python/apache_beam/coders/coders.py
##########
@@ -1014,6 +1016,42 @@ def as_deterministic_coder(self, step_label,
error_message=None):
return self
+class ProtoPlusCoder(FastCoder):
+ """A Coder for Google Protocol Buffers wrapped using the proto-plus library.
+
+ ProtoPlusCoder is registered in the global CoderRegistry as the default coder
+ for any proto.Message object.
+ """
+ def __init__(self, proto_plus_message_type):
+ # type: (Type[proto.Message]) -> None
+ self.proto_plus_message_type = proto_plus_message_type
+
+ def _create_impl(self):
+ return coder_impl.ProtoPlusCoderImpl(self.proto_plus_message_type)
+
+ def is_deterministic(self):
+ return True
Review comment:
Yes- it uses the deterministic proto coder
##########
File path: sdks/python/apache_beam/coders/coder_impl.py
##########
@@ -310,6 +311,19 @@ def encode(self, value):
return value.SerializePartialToString(deterministic=True)
+class ProtoPlusCoderImpl(SimpleCoderImpl):
Review comment:
It shouldn't- either they're currently not resolving a coder (and
broken) or theyre already registering a custom coder (and this is ignored) or
they're not using proto-plus types in their interfaces.
##########
File path: sdks/python/apache_beam/coders/coders.py
##########
@@ -1014,6 +1016,42 @@ def as_deterministic_coder(self, step_label,
error_message=None):
return self
+class ProtoPlusCoder(FastCoder):
+ """A Coder for Google Protocol Buffers wrapped using the proto-plus library.
+
+ ProtoPlusCoder is registered in the global CoderRegistry as the default coder
+ for any proto.Message object.
+ """
+ def __init__(self, proto_plus_message_type):
+ # type: (Type[proto.Message]) -> None
+ self.proto_plus_message_type = proto_plus_message_type
+
+ def _create_impl(self):
+ return coder_impl.ProtoPlusCoderImpl(self.proto_plus_message_type)
+
+ def is_deterministic(self):
+ return True
+
+ def __eq__(self, other):
+ return (
+ type(self) == type(other) and
+ self.proto_plus_message_type == other.proto_plus_message_type)
+
+ def __hash__(self):
+ return hash(self.proto_plus_message_type)
+
+ @classmethod
+ def from_type_hint(cls, typehint, unused_registry):
+ if issubclass(typehint, proto.Message):
Review comment:
Yes. Currently they fall back to picklecoder and fail to serialize the
state.
##########
File path: sdks/python/apache_beam/coders/coders_test.py
##########
@@ -109,6 +111,45 @@ def test_deterministic_proto_coder_determinism(self):
self.assertEqual(coder.encode(mm_forward), coder.encode(mm_reverse))
+class ProtoPlusMessageB(proto.Message):
+ field1 = proto.Field(proto.BOOL, number=1)
+
+
+class ProtoPlusMessageA(proto.Message):
+ field1 = proto.Field(proto.STRING, number=1)
+ field2 = proto.RepeatedField(ProtoPlusMessageB, number=2)
+
+
+class ProtoPlusMessageWithMap(proto.Message):
+ field1 = proto.MapField(proto.STRING, ProtoPlusMessageA, number=1)
+
+
+class ProtoPlusCoderTest(unittest.TestCase):
+ def test_proto_plus_coder(self):
Review comment:
See line 140, this is tested.
##########
File path: sdks/python/apache_beam/coders/coder_impl.py
##########
@@ -310,6 +311,19 @@ def encode(self, value):
return value.SerializePartialToString(deterministic=True)
+class ProtoPlusCoderImpl(SimpleCoderImpl):
+ """For internal use only; no backwards-compatibility guarantees."""
+ def __init__(self, proto_plus_type):
+ # type: (Type[proto.Message]) -> None
+ self.proto_plus_type = proto_plus_type
+
+ def encode(self, value):
+ return value._pb.SerializePartialToString(deterministic=True)
+
+ def decode(self, value):
+ return self.proto_plus_type.deserialize(value)
Review comment:
This is the preferred way to deserialize proto plus types. I can't use
the serialize method on line 321 because it is not deterministic, so must drop
down to the protobuf interface to do so.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]