This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch revert-15357-faster-boundedsource-sdfing
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5d5e6928f7f608eff97c2ccf41ed4e6b9c73729e
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Wed Aug 25 17:23:39 2021 -0700

    Revert "Merge pull request #15357 from [BEAM-12781] Add memoization of 
BoundedSource encoding for SDFBoundedSourceReader"
    
    This reverts commit 85d46e15725389813a3c8e4afae8b65402e0ca04.
---
 sdks/python/apache_beam/coders/coders.py           | 43 ----------------------
 .../apache_beam/coders/coders_test_common.py       |  4 --
 sdks/python/apache_beam/io/iobase.py               | 24 ++----------
 3 files changed, 3 insertions(+), 68 deletions(-)

diff --git a/sdks/python/apache_beam/coders/coders.py 
b/sdks/python/apache_beam/coders/coders.py
index 370110d..2d2b336 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -18,27 +18,11 @@
 """Collection of useful coders.
 
 Only those coders listed in __all__ are part of the public API of this module.
-
-## On usage of `pickle`, `dill` and `pickler` in Beam
-
-In Beam, we generally we use `pickle` for pipeline elements and `dill` for
-more complex types, like user functions.
-
-`pickler` is Beam's own wrapping of dill + compression + error handling.
-It serves also as an API to mask the actual encoding layer (so we can
-change it from `dill` if necessary).
-
-We created `_MemoizingPickleCoder` to improve performance when serializing
-complex user types for the execution of SDF. Specifically to address
-BEAM-12781, where many identical `BoundedSource` instances are being
-encoded.
-
 """
 # pytype: skip-file
 
 import base64
 import pickle
-from functools import lru_cache
 from typing import TYPE_CHECKING
 from typing import Any
 from typing import Callable
@@ -757,33 +741,6 @@ class _PickleCoderBase(FastCoder):
     return hash(type(self))
 
 
-class _MemoizingPickleCoder(_PickleCoderBase):
-  """Coder using Python's pickle functionality with memoization."""
-  def __init__(self, cache_size=16):
-    super(_MemoizingPickleCoder, self).__init__()
-    self.cache_size = cache_size
-
-  def _create_impl(self):
-    from apache_beam.internal import pickler
-    dumps = pickler.dumps
-
-    mdumps = lru_cache(maxsize=self.cache_size, typed=True)(dumps)
-
-    def _nonhashable_dumps(x):
-      try:
-        return mdumps(x)
-      except TypeError:
-        return dumps(x)
-
-    return coder_impl.CallbackCoderImpl(_nonhashable_dumps, pickler.loads)
-
-  def as_deterministic_coder(self, step_label, error_message=None):
-    return FastPrimitivesCoder(self, requires_deterministic=step_label)
-
-  def to_type_hint(self):
-    return Any
-
-
 class PickleCoder(_PickleCoderBase):
   """Coder using Python's pickle functionality."""
   def _create_impl(self):
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py 
b/sdks/python/apache_beam/coders/coders_test_common.py
index 44ba749..11334ed 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -207,10 +207,6 @@ class CodersTest(unittest.TestCase):
     coder = coders.PickleCoder()
     self.check_coder(coder, *self.test_values)
 
-  def test_memoizing_pickle_coder(self):
-    coder = coders._MemoizingPickleCoder()
-    self.check_coder(coder, *self.test_values)
-
   def test_deterministic_coder(self):
     coder = coders.FastPrimitivesCoder()
     deterministic_coder = coders.DeterministicFastPrimitivesCoder(coder, 
'step')
diff --git a/sdks/python/apache_beam/io/iobase.py 
b/sdks/python/apache_beam/io/iobase.py
index 8e81da3..5d8e5df 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -44,8 +44,6 @@ from typing import Union
 
 from apache_beam import coders
 from apache_beam import pvalue
-from apache_beam.coders.coders import _MemoizingPickleCoder
-from apache_beam.internal import pickler
 from apache_beam.portability import common_urns
 from apache_beam.portability import python_urns
 from apache_beam.portability.api import beam_runner_api_pb2
@@ -888,14 +886,12 @@ class Read(ptransform.PTransform):
 
   def expand(self, pbegin):
     if isinstance(self.source, BoundedSource):
-      coders.registry.register_coder(BoundedSource, _MemoizingPickleCoder)
       display_data = self.source.display_data() or {}
       display_data['source'] = self.source.__class__
-
       return (
           pbegin
           | Impulse()
-          | core.Map(lambda _: self.source).with_output_types(BoundedSource)
+          | core.Map(lambda _: self.source)
           | SDFBoundedSourceReader(display_data))
     elif isinstance(self.source, ptransform.PTransform):
       # The Read transform can also admit a full PTransform as an input
@@ -1577,18 +1573,6 @@ class 
_SDFBoundedSourceRestrictionTracker(RestrictionTracker):
     return True
 
 
-class _SDFBoundedSourceWrapperRestrictionCoder(coders.Coder):
-  def decode(self, value):
-    return _SDFBoundedSourceRestriction(SourceBundle(*pickler.loads(value)))
-
-  def encode(self, restriction):
-    return pickler.dumps((
-        restriction._source_bundle.weight,
-        restriction._source_bundle.source,
-        restriction._source_bundle.start_position,
-        restriction._source_bundle.stop_position))
-
-
 class _SDFBoundedSourceRestrictionProvider(core.RestrictionProvider):
   """
   A `RestrictionProvider` that is used by SDF for `BoundedSource`.
@@ -1596,10 +1580,8 @@ class 
_SDFBoundedSourceRestrictionProvider(core.RestrictionProvider):
   This restriction provider initializes restriction based on input
   element that is expected to be of BoundedSource type.
   """
-  def __init__(self, desired_chunk_size=None, restriction_coder=None):
+  def __init__(self, desired_chunk_size=None):
     self._desired_chunk_size = desired_chunk_size
-    self._restriction_coder = (
-        restriction_coder or _SDFBoundedSourceWrapperRestrictionCoder())
 
   def _check_source(self, src):
     if not isinstance(src, BoundedSource):
@@ -1636,7 +1618,7 @@ class 
_SDFBoundedSourceRestrictionProvider(core.RestrictionProvider):
     return restriction.weight()
 
   def restriction_coder(self):
-    return self._restriction_coder
+    return coders.DillCoder()
 
 
 class SDFBoundedSourceReader(PTransform):

Reply via email to