udim commented on a change in pull request #14180:
URL: https://github.com/apache/beam/pull/14180#discussion_r592021130



##########
File path: sdks/python/apache_beam/coders/coders.py
##########
@@ -754,7 +754,7 @@ def _create_impl(self):
         lambda x: dumps(x, protocol), pickle.loads)
 
   def as_deterministic_coder(self, step_label, error_message=None):
-    return DeterministicFastPrimitivesCoder(self, step_label)
+    return FastPrimitivesCoder(self, requires_deterministic=step_label)

Review comment:
       The name `requires_deterministic` is confusing. I thought it was a 
boolean and that passing `step_label` was a bug. Would it make more sense to 
have a separate `step_label` arg? Or maybe rename the single arg to 
`deterministic_step_label`?

##########
File path: sdks/python/apache_beam/coders/coder_impl.py
##########
@@ -477,12 +486,40 @@ def decode_from_stream(self, stream, nested):
       return not not stream.read_byte()
     elif t == ITERABLE_LIKE_TYPE:
       return self.iterable_coder_impl.decode_from_stream(stream, nested)
+    elif t == PROTO_TYPE:
+      cls = self.decode_type(stream)
+      msg = cls()
+      msg.ParseFromString(stream.read_all(True))
+      return msg
+    elif t == DATACLASS_TYPE or t == NAMED_TUPLE_TYPE:
+      cls = self.decode_type(stream)
+      return cls(*self.iterable_coder_impl.decode_from_stream(stream, True))
     elif t == UNKNOWN_TYPE:
       return self.fallback_coder_impl.decode_from_stream(stream, nested)
     else:
       raise ValueError('Unknown type tag %x' % t)
 
 
+_unpickled_types = {}
+
+
+def _unpickle_type(bs):
+  t = _unpickled_types.get(bs, None)
+  if t is None:
+    t = _unpickled_types[bs] = dill.loads(bs)
+    # Fix unpicklable named tuples on older versions of Python.
+    if t.__base__ is tuple and hasattr(t, '_fields'):
+      try:
+        pickle.loads(pickle.dumps(t))

Review comment:
       Could you explain why this check is needed? The code that 
encodes/decodes NAMED_TUPLE_TYPE will always use dill.loads/dumps, and that 
works correctly. Any other place in the code that tries to pickle a NamedTuple 
won't use this code (won't be separately pickling just the type).
   
   Unless the object is pickled/unpickled using 
DeterministicFastPrimitivesCoder and then using FastPrimitivesCoder, and it's 
no longer picklable the second time?
   

##########
File path: sdks/python/apache_beam/coders/coder_impl.py
##########
@@ -477,12 +486,40 @@ def decode_from_stream(self, stream, nested):
       return not not stream.read_byte()
     elif t == ITERABLE_LIKE_TYPE:
       return self.iterable_coder_impl.decode_from_stream(stream, nested)
+    elif t == PROTO_TYPE:
+      cls = self.decode_type(stream)
+      msg = cls()
+      msg.ParseFromString(stream.read_all(True))
+      return msg
+    elif t == DATACLASS_TYPE or t == NAMED_TUPLE_TYPE:
+      cls = self.decode_type(stream)
+      return cls(*self.iterable_coder_impl.decode_from_stream(stream, True))
     elif t == UNKNOWN_TYPE:
       return self.fallback_coder_impl.decode_from_stream(stream, nested)
     else:
       raise ValueError('Unknown type tag %x' % t)
 
 
+_unpickled_types = {}
+
+
+def _unpickle_type(bs):
+  t = _unpickled_types.get(bs, None)
+  if t is None:
+    t = _unpickled_types[bs] = dill.loads(bs)
+    # Fix unpicklable named tuples on older versions of Python.

Review comment:
       Please mention the versions this is required for, so we can remove this 
when support for older Python versions is dropped.

##########
File path: sdks/python/apache_beam/coders/coder_impl.py
##########
@@ -372,9 +337,11 @@ def encode(self, value):
 
 class FastPrimitivesCoderImpl(StreamCoderImpl):
   """For internal use only; no backwards-compatibility guarantees."""
-  def __init__(self, fallback_coder_impl):
+  def __init__(self, fallback_coder_impl, requires_deterministic=None):
     self.fallback_coder_impl = fallback_coder_impl
     self.iterable_coder_impl = IterableCoderImpl(self)
+    self.requires_deterministic = requires_deterministic
+    self.decoded_types = {}

Review comment:
       unused?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to