[ 
https://issues.apache.org/jira/browse/BEAM-11719?focusedWorklogId=564336&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-564336
 ]

ASF GitHub Bot logged work on BEAM-11719:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Mar/21 06:27
            Start Date: 11/Mar/21 06:27
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on a change in pull request #14180:
URL: https://github.com/apache/beam/pull/14180#discussion_r592095823



##########
File path: sdks/python/apache_beam/coders/coders.py
##########
@@ -754,7 +754,7 @@ def _create_impl(self):
         lambda x: dumps(x, protocol), pickle.loads)
 
   def as_deterministic_coder(self, step_label, error_message=None):
-    return DeterministicFastPrimitivesCoder(self, step_label)
+    return FastPrimitivesCoder(self, requires_deterministic=step_label)

Review comment:
       Good point. Renamed.

##########
File path: sdks/python/apache_beam/coders/coder_impl.py
##########
@@ -477,12 +486,40 @@ def decode_from_stream(self, stream, nested):
       return not not stream.read_byte()
     elif t == ITERABLE_LIKE_TYPE:
       return self.iterable_coder_impl.decode_from_stream(stream, nested)
+    elif t == PROTO_TYPE:
+      cls = self.decode_type(stream)
+      msg = cls()
+      msg.ParseFromString(stream.read_all(True))
+      return msg
+    elif t == DATACLASS_TYPE or t == NAMED_TUPLE_TYPE:
+      cls = self.decode_type(stream)
+      return cls(*self.iterable_coder_impl.decode_from_stream(stream, True))
     elif t == UNKNOWN_TYPE:
       return self.fallback_coder_impl.decode_from_stream(stream, nested)
     else:
       raise ValueError('Unknown type tag %x' % t)
 
 
+_unpickled_types = {}
+
+
+def _unpickle_type(bs):
+  t = _unpickled_types.get(bs, None)
+  if t is None:
+    t = _unpickled_types[bs] = dill.loads(bs)
+    # Fix unpicklable named tuples on older versions of Python.

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 564336)
    Time Spent: 8h 10m  (was: 8h)

> Enforce deterministic coding for GroupByKey and Stateful DoFns
> --------------------------------------------------------------
>
>                 Key: BEAM-11719
>                 URL: https://issues.apache.org/jira/browse/BEAM-11719
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Robert Bradshaw
>            Assignee: Robert Bradshaw
>            Priority: P1
>          Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> If a non-deterministic coder, such as pickling, is used for keys this can 
> result in two copies of the same key being grouped separately (based on their 
> encodings). 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to