Implement coder optimized for coding primitives.

This coder falls back to another coder (e.g. PickleCoder)
for other types, but is much faster for ints, strings, etc.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f58c2bda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f58c2bda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f58c2bda

Branch: refs/heads/python-sdk
Commit: f58c2bdaae241718e0b4e250a90304317445613f
Parents: 69f895a
Author: Robert Bradshaw <rober...@google.com>
Authored: Tue Jul 12 15:58:56 2016 -0700
Committer: Robert Bradshaw <rober...@google.com>
Committed: Mon Jul 18 17:48:33 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.pxd   |  8 +++
 sdks/python/apache_beam/coders/coder_impl.py    | 64 ++++++++++++++++++++
 sdks/python/apache_beam/coders/coders.py        | 43 +++++++++++++
 .../apache_beam/coders/coders_test_common.py    |  8 +++
 4 files changed, 123 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coder_impl.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd 
b/sdks/python/apache_beam/coders/coder_impl.pxd
index 0c92c5b..ee1590d 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -59,6 +59,14 @@ cdef class DeterministicPickleCoderImpl(CoderImpl):
   cdef bint _check_safe(self, value) except -1
 
 
+cdef object NoneType
+cdef char UNKNOWN_TYPE, NONE_TYPE, INT_TYPE, FLOAT_TYPE
+cdef char STR_TYPE, UNICODE_TYPE, LIST_TYPE, TUPLE_TYPE
+
+cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
+  cdef CoderImpl fallback_coder_impl
+
+
 cdef class BytesCoderImpl(CoderImpl):
   pass
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py 
b/sdks/python/apache_beam/coders/coder_impl.py
index ca64d9c..473cff5 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -26,6 +26,7 @@ coder_impl.pxd file for type hints.
 """
 
 import collections
+from types import NoneType
 
 
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -147,6 +148,69 @@ class DeterministicPickleCoderImpl(CoderImpl):
     return self._pickle_coder.decode(encoded)
 
 
+UNKNOWN_TYPE = 0xFF
+NONE_TYPE = 0
+INT_TYPE = 1
+FLOAT_TYPE = 2
+STR_TYPE = 3
+UNICODE_TYPE = 4
+LIST_TYPE = 5
+TUPLE_TYPE = 6
+
+
+class FastPrimitivesCoderImpl(StreamCoderImpl):
+
+  def __init__(self, fallback_coder_impl):
+    self.fallback_coder_impl = fallback_coder_impl
+
+  def encode_to_stream(self, value, stream, nested):
+    t = type(value)
+    if t is NoneType:
+      stream.write_byte(NONE_TYPE)
+    elif t is int:
+      stream.write_byte(INT_TYPE)
+      stream.write_var_int64(value)
+    elif t is float:
+      stream.write_byte(FLOAT_TYPE)
+      stream.write_bigendian_double(value)
+    elif t is str:
+      stream.write_byte(STR_TYPE)
+      stream.write(value, nested)
+    elif t is unicode:
+      stream.write_byte(UNICODE_TYPE)
+      stream.write(value.encode('utf-8'), nested)
+    elif t is list or t is tuple:
+      stream.write_byte(LIST_TYPE if t is list else TUPLE_TYPE)
+      stream.write_var_int64(len(value))
+      for e in value:
+        self.encode_to_stream(e, stream, True)
+    else:
+      stream.write_byte(UNKNOWN_TYPE)
+      self.fallback_coder_impl.encode_to_stream(value, stream, nested)
+
+  def decode_from_stream(self, stream, nested):
+    t = stream.read_byte()
+    if t == NONE_TYPE:
+      return None
+    elif t == INT_TYPE:
+      return stream.read_var_int64()
+    elif t == FLOAT_TYPE:
+      return stream.read_bigendian_double()
+    elif t == STR_TYPE:
+      return stream.read_all(nested)
+    elif t == UNICODE_TYPE:
+      return stream.read_all(nested).decode('utf-8')
+    elif t == LIST_TYPE or t == TUPLE_TYPE:
+      vlen = stream.read_var_int64()
+      vlist = [self.decode_from_stream(stream, True) for _ in range(vlen)]
+      if t == LIST_TYPE:
+        return vlist
+      else:
+        return tuple(vlist)
+    else:
+      return self.fallback_coder_impl.decode_from_stream(stream, nested)
+
+
 class BytesCoderImpl(CoderImpl):
   """A coder for bytes/str objects."""
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py 
b/sdks/python/apache_beam/coders/coders.py
index 619586f..cf5ca6d 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -354,6 +354,49 @@ class DeterministicPickleCoder(FastCoder):
     return self
 
 
+class FastPrimitivesCoder(FastCoder):
+  """Encodes simple primitives (e.g. str, int) efficiently.
+
+  For unknown types, falls back to another coder (e.g. PickleCoder).
+  """
+  def __init__(self, fallback_coder):
+    self._fallback_coder = fallback_coder
+
+  def _create_impl(self):
+    return coder_impl.FastPrimitivesCoderImpl(
+        self._fallback_coder.get_impl())
+
+  def is_deterministic(self):
+    return self._fallback_coder.is_deterministic()
+
+  def as_cloud_object(self, is_pair_like=True):
+    value = super(FastCoder, self).as_cloud_object()
+    # We currently use this coder in places where we cannot infer the coder to
+    # use for the value type in a more granular way.  In places where the
+    # service expects a pair, it checks for the "is_pair_like" key, in which
+    # case we would fail without the hack below.
+    if is_pair_like:
+      value['is_pair_like'] = True
+      value['component_encodings'] = [
+          self.as_cloud_object(is_pair_like=False),
+          self.as_cloud_object(is_pair_like=False)
+      ]
+
+    return value
+
+  # We allow .key_coder() and .value_coder() to be called on PickleCoder since
+  # we can't always infer the return values of lambdas in ParDo operations, the
+  # result of which may be used in a GroupBykey.
+  def is_kv_coder(self):
+    return True
+
+  def key_coder(self):
+    return self
+
+  def value_coder(self):
+    return self
+
+
 class Base64PickleCoder(Coder):
   """Coder of objects by Python pickle, then base64 encoding."""
   # TODO(robertwb): Do base64 encoding where it's needed (e.g. in json) rather

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py 
b/sdks/python/apache_beam/coders/coders_test_common.py
index 0266fdc..b084947 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -111,6 +111,14 @@ class CodersTest(unittest.TestCase):
         coders.TupleCoder((coders.VarIntCoder(), coders.DillCoder())),
         (1, cell_value))
 
+  def test_fast_primitives_coder(self):
+    coder = coders.FastPrimitivesCoder(coders.SingletonCoder(len))
+    self.check_coder(coder, None, 1, -1, 1.5, 'str\0str', u'unicode\0\u0101')
+    self.check_coder(coder, (), (1, 2, 3))
+    self.check_coder(coder, [], [1, 2, 3])
+    self.check_coder(coder, len)
+    self.check_coder(coders.TupleCoder((coder,)), ('a',), (1,))
+
   def test_bytes_coder(self):
     self.check_coder(coders.BytesCoder(), 'a', '\0', 'z' * 1000)
 

Reply via email to