Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 03c3c7074 -> 979e299c7


Implement size observation for FastPrimitivesCoderImpl.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fba0e91b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fba0e91b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fba0e91b

Branch: refs/heads/python-sdk
Commit: fba0e91b18c542def24e560e44484fc825feeca6
Parents: 4ffdd88
Author: Robert Bradshaw <rober...@gmail.com>
Authored: Fri Oct 28 15:35:15 2016 -0700
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Mon Oct 31 09:07:34 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.py | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fba0e91b/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py 
b/sdks/python/apache_beam/coders/coder_impl.py
index b31e493..c73dd31 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -214,6 +214,15 @@ class FastPrimitivesCoderImpl(StreamCoderImpl):
   def __init__(self, fallback_coder_impl):
     self.fallback_coder_impl = fallback_coder_impl
 
+  def get_estimated_size_and_observables(self, value, nested=False):
+    if isinstance(value, observable.ObservableMixin):
+      # FastPrimitivesCoderImpl can presumably encode the elements too.
+      return 1, [(value, self)]
+    else:
+      out = ByteCountingOutputStream()
+      self.encode_to_stream(value, out, nested)
+      return out.get_count(), []
+
   def encode_to_stream(self, value, stream, nested):
     t = type(value)
     if t is NoneType:

Reply via email to