AnandInguva commented on a change in pull request #17037:
URL: https://github.com/apache/beam/pull/17037#discussion_r824118498
##########
File path: sdks/python/apache_beam/coders/coder_impl.py
##########
@@ -420,22 +420,41 @@ def encode_to_stream(self, value, stream, nested):
elif t in _ITERABLE_LIKE_TYPES:
stream.write_byte(ITERABLE_LIKE_TYPE)
self.iterable_coder_impl.encode_to_stream(value, stream, nested)
- # All deterministic encodings should be above this clause,
- # all non-deterministic ones below.
- elif self.requires_deterministic_step_label is not None:
- self.encode_special_deterministic(value, stream)
elif t is dict:
dict_value = value # for typing
stream.write_byte(DICT_TYPE)
stream.write_var_int64(len(dict_value))
- for k, v in dict_value.items():
- self.encode_to_stream(k, stream, True)
- self.encode_to_stream(v, stream, True)
+ if self.requires_deterministic_step_label is not None:
+ try:
+ ordered_kvs = sorted(dict_value.items())
+ except Exception as exn:
+ raise TypeError(
+ "Unable to deterministically order keys of dict for '%s'" %
+ self.requires_deterministic_step_label) from exn
+ for k, v in ordered_kvs:
+ self.encode_to_stream(k, stream, True)
+ self.encode_to_stream(v, stream, True)
+ else:
+ # dict.items() is optimized by Cython.
+ for k, v in dict_value.items():
+ self.encode_to_stream(k, stream, True)
+ self.encode_to_stream(v, stream, True)
elif t is set:
stream.write_byte(SET_TYPE)
stream.write_var_int64(len(value))
+ if self.requires_deterministic_step_label is not None:
+ try:
+ value = sorted(value)
+ except Exception as exn:
+ raise TypeError(
+ "Unable to deterministically order element of set for '%s'" %
+ self.requires_deterministic_step_label) from exn
for e in value:
self.encode_to_stream(e, stream, True)
+ # All deterministic encodings should be above this clause,
Review comment:
is this comment still valid? I see non deterministic code above IIUC.
##########
File path: sdks/python/apache_beam/coders/coder_impl.pxd
##########
@@ -92,6 +92,13 @@ cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
cdef dict _unpickled_types
+
+cdef class MapCoderImpl(StreamCoderImpl):
Review comment:
Why didn't we add this before? I am not so familiar with this
implementation.
##########
File path: sdks/python/apache_beam/coders/coder_impl.py
##########
@@ -651,18 +670,27 @@ class MapCoderImpl(StreamCoderImpl):
def __init__(
self,
key_coder, # type: CoderImpl
- value_coder # type: CoderImpl
+ value_coder, # type: CoderImpl
+ is_deterministic = False
):
self._key_coder = key_coder
self._value_coder = value_coder
+ self._is_deterministic = is_deterministic
def encode_to_stream(self, dict_value, out, nested):
out.write_bigendian_int32(len(dict_value))
- for key, value in dict_value.items():
- # Note this implementation always uses nested context when encoding keys
- # and values which differs from Java. See note in docstring.
- self._key_coder.encode_to_stream(key, out, True)
- self._value_coder.encode_to_stream(value, out, True)
+ # Note this implementation always uses nested context when encoding keys
+ # and values which differs from Java. See note in docstring.
+ if self._is_deterministic:
+ for key, value in sorted(dict_value.items()):
+ self._key_coder.encode_to_stream(key, out, True)
+ self._value_coder.encode_to_stream(value, out, True)
+ else:
+ # This loop is separate form the above so the dict.items() call will be
Review comment:
```suggestion
# This loop is separate from the above so the dict.items() call will be
```
##########
File path: sdks/python/apache_beam/coders/coder_impl.py
##########
@@ -651,18 +670,27 @@ class MapCoderImpl(StreamCoderImpl):
def __init__(
self,
key_coder, # type: CoderImpl
- value_coder # type: CoderImpl
+ value_coder, # type: CoderImpl
+ is_deterministic = False
):
self._key_coder = key_coder
self._value_coder = value_coder
+ self._is_deterministic = is_deterministic
def encode_to_stream(self, dict_value, out, nested):
out.write_bigendian_int32(len(dict_value))
- for key, value in dict_value.items():
- # Note this implementation always uses nested context when encoding keys
- # and values which differs from Java. See note in docstring.
- self._key_coder.encode_to_stream(key, out, True)
- self._value_coder.encode_to_stream(value, out, True)
+ # Note this implementation always uses nested context when encoding keys
+ # and values which differs from Java. See note in docstring.
+ if self._is_deterministic:
+ for key, value in sorted(dict_value.items()):
+ self._key_coder.encode_to_stream(key, out, True)
+ self._value_coder.encode_to_stream(value, out, True)
+ else:
+ # This loop is separate form the above so the dict.items() call will be
Review comment:
Both loops have dict.items(). For my learning, why would having two
separate loops help here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]