robertwb commented on a change in pull request #15357:
URL: https://github.com/apache/beam/pull/15357#discussion_r694269155



##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -886,12 +888,18 @@ def get_desired_chunk_size(total_size):
 
   def expand(self, pbegin):
     if isinstance(self.source, BoundedSource):
+      coders.registry.register_coder(BoundedSource, _MemoizingPickleCoder)
       display_data = self.source.display_data() or {}
       display_data['source'] = self.source.__class__
+
+      def output_source(_) -> BoundedSource:
+        assert isinstance(self.source, BoundedSource)

Review comment:
       Isn't this assert redundant with the if statement a couple lines above?

##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -1573,15 +1581,28 @@ def is_bounded(self):
     return True
 
 
+class _RestrictionCoder(coders.Coder):

Review comment:
       Maybe call this _SdfBoundedSourceRestrictionCoder or similar? 

##########
File path: sdks/python/apache_beam/coders/coders.py
##########
@@ -741,6 +742,36 @@ def __hash__(self):
     return hash(type(self))
 
 
+class _MemoizingPickleCoder(_PickleCoderBase):
+  """Coder using Python's pickle functionality with memoization."""
+  def __init__(self, cache_size=16):
+    super(_MemoizingPickleCoder, self).__init__()
+    self.cache_size = cache_size
+
+  def _create_impl(self):
+    dumps = pickle.dumps

Review comment:
       Should we be using pickler rather than raw pickle here? 

##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -1573,15 +1581,28 @@ def is_bounded(self):
     return True
 
 
+class _RestrictionCoder(coders.Coder):
+  def decode(self, value):
+    return _SDFBoundedSourceRestriction(SourceBundle(*pickler.loads(value)))
+
+  def encode(self, restriction):
+    return pickler.dumps((

Review comment:
       Should we be using dill/pickler here to avoid regressions?

##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -886,12 +888,18 @@ def get_desired_chunk_size(total_size):
 
   def expand(self, pbegin):
     if isinstance(self.source, BoundedSource):
+      coders.registry.register_coder(BoundedSource, _MemoizingPickleCoder)
       display_data = self.source.display_data() or {}
       display_data['source'] = self.source.__class__
+
+      def output_source(_) -> BoundedSource:
+        assert isinstance(self.source, BoundedSource)
+        return self.source
+
       return (
           pbegin
           | Impulse()
-          | core.Map(lambda _: self.source)
+          | core.Map(output_source)

Review comment:
       Nit: "output_source" sounds like a noun, not a verb. We could call this 
"emit_source." Alternatively, you could leave this as a lambda and do 
`core.Map(lambda _: self.source).with_output_types(BoundedSource)`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to