robertwb commented on a change in pull request #17037:
URL: https://github.com/apache/beam/pull/17037#discussion_r826150137



##########
File path: sdks/python/apache_beam/coders/coder_impl.pxd
##########
@@ -92,6 +92,13 @@ cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
 
 cdef dict _unpickled_types
 
+
+cdef class MapCoderImpl(StreamCoderImpl):

Review comment:
       No reason that I'm aware of. FYI, see the comment in the top of the 
coder_impl file as to why there's a separation here. 

##########
File path: sdks/python/apache_beam/coders/coder_impl.py
##########
@@ -420,22 +420,41 @@ def encode_to_stream(self, value, stream, nested):
     elif t in _ITERABLE_LIKE_TYPES:
       stream.write_byte(ITERABLE_LIKE_TYPE)
       self.iterable_coder_impl.encode_to_stream(value, stream, nested)
-    # All deterministic encodings should be above this clause,
-    # all non-deterministic ones below.
-    elif self.requires_deterministic_step_label is not None:
-      self.encode_special_deterministic(value, stream)
     elif t is dict:
       dict_value = value  # for typing
       stream.write_byte(DICT_TYPE)
       stream.write_var_int64(len(dict_value))
-      for k, v in dict_value.items():
-        self.encode_to_stream(k, stream, True)
-        self.encode_to_stream(v, stream, True)
+      if self.requires_deterministic_step_label is not None:
+        try:
+          ordered_kvs = sorted(dict_value.items())
+        except Exception as exn:
+          raise TypeError(
+              "Unable to deterministically order keys of dict for '%s'" %
+              self.requires_deterministic_step_label) from exn
+        for k, v in ordered_kvs:
+          self.encode_to_stream(k, stream, True)
+          self.encode_to_stream(v, stream, True)
+      else:
+        # dict.items() is optimized by Cython.
+        for k, v in dict_value.items():
+          self.encode_to_stream(k, stream, True)
+          self.encode_to_stream(v, stream, True)
     elif t is set:
       stream.write_byte(SET_TYPE)
       stream.write_var_int64(len(value))
+      if self.requires_deterministic_step_label is not None:
+        try:
+          value = sorted(value)
+        except Exception as exn:
+          raise TypeError(
+              "Unable to deterministically order element of set for '%s'" %
+              self.requires_deterministic_step_label) from exn
       for e in value:
         self.encode_to_stream(e, stream, True)
+    # All deterministic encodings should be above this clause,

Review comment:
       Clarified. 

##########
File path: sdks/python/apache_beam/coders/coder_impl.py
##########
@@ -651,18 +670,27 @@ class MapCoderImpl(StreamCoderImpl):
   def __init__(
       self,
       key_coder,  # type: CoderImpl
-      value_coder  # type: CoderImpl
+      value_coder,  # type: CoderImpl
+      is_deterministic = False
   ):
     self._key_coder = key_coder
     self._value_coder = value_coder
+    self._is_deterministic = is_deterministic
 
   def encode_to_stream(self, dict_value, out, nested):
     out.write_bigendian_int32(len(dict_value))
-    for key, value in dict_value.items():
-      # Note this implementation always uses nested context when encoding keys
-      # and values which differs from Java. See note in docstring.
-      self._key_coder.encode_to_stream(key, out, True)
-      self._value_coder.encode_to_stream(value, out, True)
+    # Note this implementation always uses nested context when encoding keys
+    # and values which differs from Java. See note in docstring.
+    if self._is_deterministic:
+      for key, value in sorted(dict_value.items()):
+        self._key_coder.encode_to_stream(key, out, True)
+        self._value_coder.encode_to_stream(value, out, True)
+    else:
+      # This loop is separate form the above so the dict.items() call will be

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to