Repository: incubator-beam Updated Branches: refs/heads/python-sdk b33fdfdad -> 7e870613f
Fix a couple more coder vs. element-coder changes for element sizing. These are not actually correct, but are needed to support the Dataflow service that was built to work with code before PR 1224. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/beb28453 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/beb28453 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/beb28453 Branch: refs/heads/python-sdk Commit: beb284539608cce3800ef77741a7d7cb9766b490 Parents: b33fdfd Author: Robert Bradshaw <rober...@gmail.com> Authored: Mon Oct 31 22:31:08 2016 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Mon Oct 31 22:31:08 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/coders/coder_impl.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/beb28453/sdks/python/apache_beam/coders/coder_impl.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 1ff3a44..40fc1fd 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -152,6 +152,14 @@ class CallbackCoderImpl(CoderImpl): def estimate_size(self, value, nested=False): return self._get_nested_size(self._size_estimator(value), nested) + def get_estimated_size_and_observables(self, value, nested=False): + # TODO(robertwb): Remove this once all coders are correct. + if isinstance(value, observable.ObservableMixin): + # CallbackCoderImpl can presumably encode the elements too. + return 1, [(value, self)] + else: + return self.estimate_size(value, nested), [] + class DeterministicPickleCoderImpl(CoderImpl): @@ -523,6 +531,10 @@ class WindowedValueCoderImpl(StreamCoderImpl): def get_estimated_size_and_observables(self, value, nested=False): """Returns estimated size of value along with any nested observables.""" + if isinstance(value, observable.ObservableMixin): + # Should never be here. + # TODO(robertwb): Remove when coders are set correctly. + return 0, [(value, self._value_coder)] estimated_size = 0 observables = [] value_estimated_size, value_observables = (