robertwb commented on a change in pull request #14180:
URL: https://github.com/apache/beam/pull/14180#discussion_r592095999



##########
File path: sdks/python/apache_beam/coders/coder_impl.py
##########
@@ -477,12 +486,40 @@ def decode_from_stream(self, stream, nested):
       return not not stream.read_byte()
     elif t == ITERABLE_LIKE_TYPE:
       return self.iterable_coder_impl.decode_from_stream(stream, nested)
+    elif t == PROTO_TYPE:
+      cls = self.decode_type(stream)
+      msg = cls()
+      msg.ParseFromString(stream.read_all(True))
+      return msg
+    elif t == DATACLASS_TYPE or t == NAMED_TUPLE_TYPE:
+      cls = self.decode_type(stream)
+      return cls(*self.iterable_coder_impl.decode_from_stream(stream, True))
     elif t == UNKNOWN_TYPE:
       return self.fallback_coder_impl.decode_from_stream(stream, nested)
     else:
       raise ValueError('Unknown type tag %x' % t)
 
 
+_unpickled_types = {}
+
+
+def _unpickle_type(bs):
+  t = _unpickled_types.get(bs, None)
+  if t is None:
+    t = _unpickled_types[bs] = dill.loads(bs)
+    # Fix unpicklable named tuples on older versions of Python.
+    if t.__base__ is tuple and hasattr(t, '_fields'):
+      try:
+        pickle.loads(pickle.dumps(t))

Review comment:
       Precisely that case (possibly nested in another pickled object).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to