Repository: incubator-beam Updated Branches: refs/heads/python-sdk 03c3c7074 -> 979e299c7
Implement size observation for FastPrimitivesCoderImpl. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fba0e91b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fba0e91b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fba0e91b Branch: refs/heads/python-sdk Commit: fba0e91b18c542def24e560e44484fc825feeca6 Parents: 4ffdd88 Author: Robert Bradshaw <rober...@gmail.com> Authored: Fri Oct 28 15:35:15 2016 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Mon Oct 31 09:07:34 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/coders/coder_impl.py | 9 +++++++++ 1 file changed, 9 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fba0e91b/sdks/python/apache_beam/coders/coder_impl.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index b31e493..c73dd31 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -214,6 +214,15 @@ class FastPrimitivesCoderImpl(StreamCoderImpl): def __init__(self, fallback_coder_impl): self.fallback_coder_impl = fallback_coder_impl + def get_estimated_size_and_observables(self, value, nested=False): + if isinstance(value, observable.ObservableMixin): + # FastPrimitivesCoderImpl can presumably encode the elements too. + return 1, [(value, self)] + else: + out = ByteCountingOutputStream() + self.encode_to_stream(value, out, nested) + return out.get_count(), [] + def encode_to_stream(self, value, stream, nested): t = type(value) if t is NoneType: