Implement coder optimized for coding primitives. This coder falls back to another coder (e.g. PickleCoder) for other types, but is much faster for ints, strings, etc.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f58c2bda Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f58c2bda Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f58c2bda Branch: refs/heads/python-sdk Commit: f58c2bdaae241718e0b4e250a90304317445613f Parents: 69f895a Author: Robert Bradshaw <rober...@google.com> Authored: Tue Jul 12 15:58:56 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Mon Jul 18 17:48:33 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/coders/coder_impl.pxd | 8 +++ sdks/python/apache_beam/coders/coder_impl.py | 64 ++++++++++++++++++++ sdks/python/apache_beam/coders/coders.py | 43 +++++++++++++ .../apache_beam/coders/coders_test_common.py | 8 +++ 4 files changed, 123 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coder_impl.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd index 0c92c5b..ee1590d 100644 --- a/sdks/python/apache_beam/coders/coder_impl.pxd +++ b/sdks/python/apache_beam/coders/coder_impl.pxd @@ -59,6 +59,14 @@ cdef class DeterministicPickleCoderImpl(CoderImpl): cdef bint _check_safe(self, value) except -1 +cdef object NoneType +cdef char UNKNOWN_TYPE, NONE_TYPE, INT_TYPE, FLOAT_TYPE +cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE + +cdef class FastPrimitivesCoderImpl(StreamCoderImpl): + cdef CoderImpl fallback_coder_impl + + cdef class BytesCoderImpl(CoderImpl): pass http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coder_impl.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index ca64d9c..473cff5 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -26,6 +26,7 @@ coder_impl.pxd file for type hints. """ import collections +from types import NoneType # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports @@ -147,6 +148,69 @@ class DeterministicPickleCoderImpl(CoderImpl): return self._pickle_coder.decode(encoded) +UNKNOWN_TYPE = 0xFF +NONE_TYPE = 0 +INT_TYPE = 1 +FLOAT_TYPE = 2 +STR_TYPE = 3 +UNICODE_TYPE = 4 +LIST_TYPE = 5 +TUPLE_TYPE = 6 + + +class FastPrimitivesCoderImpl(StreamCoderImpl): + + def __init__(self, fallback_coder_impl): + self.fallback_coder_impl = fallback_coder_impl + + def encode_to_stream(self, value, stream, nested): + t = type(value) + if t is NoneType: + stream.write_byte(NONE_TYPE) + elif t is int: + stream.write_byte(INT_TYPE) + stream.write_var_int64(value) + elif t is float: + stream.write_byte(FLOAT_TYPE) + stream.write_bigendian_double(value) + elif t is str: + stream.write_byte(STR_TYPE) + stream.write(value, nested) + elif t is unicode: + stream.write_byte(UNICODE_TYPE) + stream.write(value.encode('utf-8'), nested) + elif t is list or t is tuple: + stream.write_byte(LIST_TYPE if t is list else TUPLE_TYPE) + stream.write_var_int64(len(value)) + for e in value: + self.encode_to_stream(e, stream, True) + else: + stream.write_byte(UNKNOWN_TYPE) + self.fallback_coder_impl.encode_to_stream(value, stream, nested) + + def decode_from_stream(self, stream, nested): + t = stream.read_byte() + if t == NONE_TYPE: + return None + elif t == INT_TYPE: + return stream.read_var_int64() + elif t == FLOAT_TYPE: + return stream.read_bigendian_double() + elif t == STR_TYPE: + return stream.read_all(nested) + elif t == UNICODE_TYPE: + return stream.read_all(nested).decode('utf-8') + elif t == LIST_TYPE or t == TUPLE_TYPE: + vlen = stream.read_var_int64() + vlist = [self.decode_from_stream(stream, True) for _ in range(vlen)] + if t == LIST_TYPE: + return vlist + else: + return tuple(vlist) + else: + return self.fallback_coder_impl.decode_from_stream(stream, nested) + + class BytesCoderImpl(CoderImpl): """A coder for bytes/str objects.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 619586f..cf5ca6d 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -354,6 +354,49 @@ class DeterministicPickleCoder(FastCoder): return self +class FastPrimitivesCoder(FastCoder): + """Encodes simple primitives (e.g. str, int) efficiently. + + For unknown types, falls back to another coder (e.g. PickleCoder). + """ + def __init__(self, fallback_coder): + self._fallback_coder = fallback_coder + + def _create_impl(self): + return coder_impl.FastPrimitivesCoderImpl( + self._fallback_coder.get_impl()) + + def is_deterministic(self): + return self._fallback_coder.is_deterministic() + + def as_cloud_object(self, is_pair_like=True): + value = super(FastCoder, self).as_cloud_object() + # We currently use this coder in places where we cannot infer the coder to + # use for the value type in a more granular way. In places where the + # service expects a pair, it checks for the "is_pair_like" key, in which + # case we would fail without the hack below. + if is_pair_like: + value['is_pair_like'] = True + value['component_encodings'] = [ + self.as_cloud_object(is_pair_like=False), + self.as_cloud_object(is_pair_like=False) + ] + + return value + + # We allow .key_coder() and .value_coder() to be called on PickleCoder since + # we can't always infer the return values of lambdas in ParDo operations, the + # result of which may be used in a GroupBykey. + def is_kv_coder(self): + return True + + def key_coder(self): + return self + + def value_coder(self): + return self + + class Base64PickleCoder(Coder): """Coder of objects by Python pickle, then base64 encoding.""" # TODO(robertwb): Do base64 encoding where it's needed (e.g. in json) rather http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coders_test_common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 0266fdc..b084947 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -111,6 +111,14 @@ class CodersTest(unittest.TestCase): coders.TupleCoder((coders.VarIntCoder(), coders.DillCoder())), (1, cell_value)) + def test_fast_primitives_coder(self): + coder = coders.FastPrimitivesCoder(coders.SingletonCoder(len)) + self.check_coder(coder, None, 1, -1, 1.5, 'str\0str', u'unicode\0\u0101') + self.check_coder(coder, (), (1, 2, 3)) + self.check_coder(coder, [], [1, 2, 3]) + self.check_coder(coder, len) + self.check_coder(coders.TupleCoder((coder,)), ('a',), (1,)) + def test_bytes_coder(self): self.check_coder(coders.BytesCoder(), 'a', '\0', 'z' * 1000)