DeterministicCoder rename
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5d14b080 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5d14b080 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5d14b080 Branch: refs/heads/python-sdk Commit: 5d14b0807a6647f6736beae1cbb8da6553505a09 Parents: e1e4b7c Author: Vikas Kedigehalli <[email protected]> Authored: Tue Nov 1 16:08:32 2016 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Mon Nov 7 13:16:43 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/coders/coder_impl.py | 21 +++++++++++++------- sdks/python/apache_beam/coders/coders.py | 12 +++++------ .../apache_beam/coders/coders_test_common.py | 13 ++++++------ sdks/python/apache_beam/coders/typecoders.py | 5 +++-- 4 files changed, 30 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5d14b080/sdks/python/apache_beam/coders/coder_impl.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index be15a7d..d075814 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -161,10 +161,10 @@ class CallbackCoderImpl(CoderImpl): return self.estimate_size(value, nested), [] -class DeterministicPickleCoderImpl(CoderImpl): +class DeterministicFastPrimitivesCoderImpl(CoderImpl): - def __init__(self, pickle_coder, step_label): - self._pickle_coder = pickle_coder + def __init__(self, coder, step_label): + self._underlying_coder = coder self._step_label = step_label def _check_safe(self, value): @@ -183,17 +183,24 @@ class DeterministicPickleCoderImpl(CoderImpl): def encode_to_stream(self, value, stream, nested): self._check_safe(value) - return self._pickle_coder.encode_to_stream(value, stream, nested) + return self._underlying_coder.encode_to_stream(value, stream, nested) def decode_from_stream(self, stream, nested): - return self._pickle_coder.decode_from_stream(stream, nested) + return self._underlying_coder.decode_from_stream(stream, nested) def encode(self, value): self._check_safe(value) - return self._pickle_coder.encode(value) + return self._underlying_coder.encode(value) def decode(self, encoded): - return self._pickle_coder.decode(encoded) + return self._underlying_coder.decode(encoded) + + def estimate_size(self, value, nested=False): + return self._underlying_coder.estimate_size(value, nested) + + def get_estimated_size_and_observables(self, value, nested=False): + return self._underlying_coder.get_estimated_size_and_observables( + value, nested) class ProtoCoderImpl(SimpleCoderImpl): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5d14b080/sdks/python/apache_beam/coders/coders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index ed4ac92..1e78b1d 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -352,16 +352,16 @@ class DillCoder(_PickleCoderBase): return coder_impl.CallbackCoderImpl(maybe_dill_dumps, maybe_dill_loads) -class DeterministicPickleCoder(FastCoder): - """Throws runtime errors when pickling non-deterministic values.""" +class DeterministicFastPrimitivesCoder(FastCoder): + """Throws runtime errors when encoding non-deterministic values.""" - def __init__(self, pickle_coder, step_label): - self._pickle_coder = pickle_coder + def __init__(self, coder, step_label): + self._underlying_coder = coder self._step_label = step_label def _create_impl(self): - return coder_impl.DeterministicPickleCoderImpl( - self._pickle_coder.get_impl(), self._step_label) + return coder_impl.DeterministicFastPrimitivesCoderImpl( + self._underlying_coder.get_impl(), self._step_label) def is_deterministic(self): return True http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5d14b080/sdks/python/apache_beam/coders/coders_test_common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 40044aa..1af8347 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -104,15 +104,16 @@ class CodersTest(unittest.TestCase): def test_pickle_coder(self): self.check_coder(coders.PickleCoder(), 'a', 1, 1.5, (1, 2, 3)) - def test_deterministic_pickle_coder(self): - coder = coders.DeterministicPickleCoder(coders.PickleCoder(), 'step') - self.check_coder(coder, 'a', 1, 1.5, (1, 2, 3)) + def test_deterministic_coder(self): + coder = coders.FastPrimitivesCoder() + deterministic_coder = coders.DeterministicFastPrimitivesCoder(coder, 'step') + self.check_coder(deterministic_coder, 'a', 1, 1.5, (1, 2, 3)) with self.assertRaises(TypeError): - self.check_coder(coder, dict()) + self.check_coder(deterministic_coder, dict()) with self.assertRaises(TypeError): - self.check_coder(coder, [1, dict()]) + self.check_coder(deterministic_coder, [1, dict()]) - self.check_coder(coders.TupleCoder((coder, coders.PickleCoder())), + self.check_coder(coders.TupleCoder((deterministic_coder, coder)), (1, dict()), ('a', [dict()])) def test_dill_coder(self): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5d14b080/sdks/python/apache_beam/coders/typecoders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 1a01ccb..0681218 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -146,12 +146,13 @@ class CoderRegistry(object): 'and for custom key classes, by writing a ' 'deterministic custom Coder. Please see the ' 'documentation for more details.' % (key_coder, op_name)) - # TODO(vikasrk): Should we include other fallback coders? + # TODO(vikasrk): PickleCoder will eventually be removed once its direct + # usage is stopped. if isinstance(key_coder, (coders.PickleCoder, coders.FastPrimitivesCoder)): if not silent: logging.warning(error_msg) - return coders.DeterministicPickleCoder(key_coder, op_name) + return coders.DeterministicFastPrimitivesCoder(key_coder, op_name) else: raise ValueError(error_msg) else:
