This is an automated email from the ASF dual-hosted git repository. chamikara pushed a commit to branch revert-15357-faster-boundedsource-sdfing in repository https://gitbox.apache.org/repos/asf/beam.git
commit 5d5e6928f7f608eff97c2ccf41ed4e6b9c73729e Author: Chamikara Jayalath <[email protected]> AuthorDate: Wed Aug 25 17:23:39 2021 -0700 Revert "Merge pull request #15357 from [BEAM-12781] Add memoization of BoundedSource encoding for SDFBoundedSourceReader" This reverts commit 85d46e15725389813a3c8e4afae8b65402e0ca04. --- sdks/python/apache_beam/coders/coders.py | 43 ---------------------- .../apache_beam/coders/coders_test_common.py | 4 -- sdks/python/apache_beam/io/iobase.py | 24 ++---------- 3 files changed, 3 insertions(+), 68 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 370110d..2d2b336 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -18,27 +18,11 @@ """Collection of useful coders. Only those coders listed in __all__ are part of the public API of this module. - -## On usage of `pickle`, `dill` and `pickler` in Beam - -In Beam, we generally we use `pickle` for pipeline elements and `dill` for -more complex types, like user functions. - -`pickler` is Beam's own wrapping of dill + compression + error handling. -It serves also as an API to mask the actual encoding layer (so we can -change it from `dill` if necessary). - -We created `_MemoizingPickleCoder` to improve performance when serializing -complex user types for the execution of SDF. Specifically to address -BEAM-12781, where many identical `BoundedSource` instances are being -encoded. - """ # pytype: skip-file import base64 import pickle -from functools import lru_cache from typing import TYPE_CHECKING from typing import Any from typing import Callable @@ -757,33 +741,6 @@ class _PickleCoderBase(FastCoder): return hash(type(self)) -class _MemoizingPickleCoder(_PickleCoderBase): - """Coder using Python's pickle functionality with memoization.""" - def __init__(self, cache_size=16): - super(_MemoizingPickleCoder, self).__init__() - self.cache_size = cache_size - - def _create_impl(self): - from apache_beam.internal import pickler - dumps = pickler.dumps - - mdumps = lru_cache(maxsize=self.cache_size, typed=True)(dumps) - - def _nonhashable_dumps(x): - try: - return mdumps(x) - except TypeError: - return dumps(x) - - return coder_impl.CallbackCoderImpl(_nonhashable_dumps, pickler.loads) - - def as_deterministic_coder(self, step_label, error_message=None): - return FastPrimitivesCoder(self, requires_deterministic=step_label) - - def to_type_hint(self): - return Any - - class PickleCoder(_PickleCoderBase): """Coder using Python's pickle functionality.""" def _create_impl(self): diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 44ba749..11334ed 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -207,10 +207,6 @@ class CodersTest(unittest.TestCase): coder = coders.PickleCoder() self.check_coder(coder, *self.test_values) - def test_memoizing_pickle_coder(self): - coder = coders._MemoizingPickleCoder() - self.check_coder(coder, *self.test_values) - def test_deterministic_coder(self): coder = coders.FastPrimitivesCoder() deterministic_coder = coders.DeterministicFastPrimitivesCoder(coder, 'step') diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 8e81da3..5d8e5df 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -44,8 +44,6 @@ from typing import Union from apache_beam import coders from apache_beam import pvalue -from apache_beam.coders.coders import _MemoizingPickleCoder -from apache_beam.internal import pickler from apache_beam.portability import common_urns from apache_beam.portability import python_urns from apache_beam.portability.api import beam_runner_api_pb2 @@ -888,14 +886,12 @@ class Read(ptransform.PTransform): def expand(self, pbegin): if isinstance(self.source, BoundedSource): - coders.registry.register_coder(BoundedSource, _MemoizingPickleCoder) display_data = self.source.display_data() or {} display_data['source'] = self.source.__class__ - return ( pbegin | Impulse() - | core.Map(lambda _: self.source).with_output_types(BoundedSource) + | core.Map(lambda _: self.source) | SDFBoundedSourceReader(display_data)) elif isinstance(self.source, ptransform.PTransform): # The Read transform can also admit a full PTransform as an input @@ -1577,18 +1573,6 @@ class _SDFBoundedSourceRestrictionTracker(RestrictionTracker): return True -class _SDFBoundedSourceWrapperRestrictionCoder(coders.Coder): - def decode(self, value): - return _SDFBoundedSourceRestriction(SourceBundle(*pickler.loads(value))) - - def encode(self, restriction): - return pickler.dumps(( - restriction._source_bundle.weight, - restriction._source_bundle.source, - restriction._source_bundle.start_position, - restriction._source_bundle.stop_position)) - - class _SDFBoundedSourceRestrictionProvider(core.RestrictionProvider): """ A `RestrictionProvider` that is used by SDF for `BoundedSource`. @@ -1596,10 +1580,8 @@ class _SDFBoundedSourceRestrictionProvider(core.RestrictionProvider): This restriction provider initializes restriction based on input element that is expected to be of BoundedSource type. """ - def __init__(self, desired_chunk_size=None, restriction_coder=None): + def __init__(self, desired_chunk_size=None): self._desired_chunk_size = desired_chunk_size - self._restriction_coder = ( - restriction_coder or _SDFBoundedSourceWrapperRestrictionCoder()) def _check_source(self, src): if not isinstance(src, BoundedSource): @@ -1636,7 +1618,7 @@ class _SDFBoundedSourceRestrictionProvider(core.RestrictionProvider): return restriction.weight() def restriction_coder(self): - return self._restriction_coder + return coders.DillCoder() class SDFBoundedSourceReader(PTransform):
