[ 
https://issues.apache.org/jira/browse/BEAM-3812?focusedWorklogId=92995&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92995
 ]

ASF GitHub Bot logged work on BEAM-3812:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Apr/18 00:57
            Start Date: 20/Apr/18 00:57
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #5174: [BEAM-3812] Avoid 
pickling composite transforms.
URL: https://github.com/apache/beam/pull/5174
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py 
b/sdks/python/apache_beam/io/gcp/pubsub.py
index f5ca17e64a1..e45dd23bfef 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -152,6 +152,11 @@ def expand(self, pvalue):
       pcoll.element_type = bytes
     return pcoll
 
+  def to_runner_api_parameter(self, context):
+    # Required as this is identified by type in PTransformOverrides.
+    # TODO(BEAM-3812): Use an actual URN here.
+    return self.to_runner_api_pickled(context)
+
 
 class ReadStringsFromPubSub(PTransform):
   """A ``PTransform`` for reading utf-8 string payloads from Cloud Pub/Sub.
@@ -193,6 +198,11 @@ def expand(self, pcoll):
     pcoll.element_type = bytes
     return pcoll | Write(self._sink)
 
+  def to_runner_api_parameter(self, context):
+    # Required as this is identified by type in PTransformOverrides.
+    # TODO(BEAM-3812): Use an actual URN here.
+    return self.to_runner_api_pickled(context)
+
 
 PROJECT_ID_REGEXP = '[a-z][-a-z0-9:.]{4,61}[a-z0-9]'
 SUBSCRIPTION_REGEXP = 'projects/([^/]+)/subscriptions/(.+)'
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 74bd4cb17d0..31fe5c51952 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -572,7 +572,7 @@ class Visitor(PipelineVisitor):  # pylint: 
disable=used-before-assignment
       ok = True  # Really a nonlocal.
 
       def enter_composite_transform(self, transform_node):
-        self.visit_transform(transform_node)
+        pass
 
       def visit_transform(self, transform_node):
         try:
@@ -822,7 +822,7 @@ def transform_to_runner_api(transform, context):
       if transform is None:
         return None
       else:
-        return transform.to_runner_api(context)
+        return transform.to_runner_api(context, has_parts=bool(self.parts))
     return beam_runner_api_pb2.PTransform(
         unique_name=self.full_label,
         spec=transform_to_runner_api(self.transform, context),
@@ -893,6 +893,13 @@ class PTransformOverride(object):
   def matches(self, applied_ptransform):
     """Determines whether the given AppliedPTransform matches.
 
+    Note that the matching will happen *after* Runner API proto translation.
+    If matching is done via type checks, to/from_runner_api[_parameter] methods
+    must be implemented to preserve the type (and other data) through proto
+    serialization.
+
+    Consider URN-based translation instead.
+
     Args:
       applied_ptransform: AppliedPTransform to be matched.
 
diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index c3dd2296f20..ed27aa7658b 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -88,6 +88,9 @@ class DoubleParDo(beam.PTransform):
   def expand(self, input):
     return input | 'Inner' >> beam.Map(lambda a: a * 2)
 
+  def to_runner_api_parameter(self, context):
+    return self.to_runner_api_pickled(context)
+
 
 class TripleParDo(beam.PTransform):
   def expand(self, input):
@@ -524,36 +527,6 @@ def test_dir(self):
 
 class RunnerApiTest(unittest.TestCase):
 
-  def test_simple(self):
-    """Tests serializing, deserializing, and running a simple pipeline.
-
-    More extensive tests are done at pipeline.run for each suitable test.
-    """
-    p = beam.Pipeline()
-    p | beam.Create([None]) | beam.Map(lambda x: x)  # pylint: 
disable=expression-not-assigned
-    proto = p.to_runner_api()
-
-    p2 = Pipeline.from_runner_api(proto, p.runner, p._options)
-    p2.run()
-
-  def test_pickling(self):
-    class MyPTransform(beam.PTransform):
-      pickle_count = [0]
-
-      def expand(self, p):
-        self.p = p
-        return p | beam.Create([None])
-
-      def __reduce__(self):
-        self.pickle_count[0] += 1
-        return str, ()
-
-    p = beam.Pipeline()
-    for k in range(20):
-      p | 'Iter%s' % k >> MyPTransform()  # pylint: 
disable=expression-not-assigned
-    p.to_runner_api()
-    self.assertEqual(MyPTransform.pickle_count[0], 20)
-
   def test_parent_pointer(self):
     class MyPTransform(beam.PTransform):
 
diff --git a/sdks/python/apache_beam/portability/python_urns.py 
b/sdks/python/apache_beam/portability/python_urns.py
index a284b5fe66c..fd8dbbaa9bb 100644
--- a/sdks/python/apache_beam/portability/python_urns.py
+++ b/sdks/python/apache_beam/portability/python_urns.py
@@ -24,7 +24,9 @@
 PICKLED_DOFN = "beam:dofn:pickled_python:v1"
 PICKLED_DOFN_INFO = "beam:dofn:pickled_python_info:v1"
 PICKLED_SOURCE = "beam:source:pickled_python:v1"
-PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v1"
+PICKLED_TRANSFORM = "beam:transform:pickled_python:v1"
 PICKLED_WINDOW_MAPPING_FN = "beam:window_mapping_fn:pickled_python:v1"
 PICKLED_WINDOWFN = "beam:windowfn:pickled_python:v1"
 PICKLED_VIEWFN = "beam:view_fn:pickled_python_data:v1"
+
+GENERIC_COMPOSITE_TRANSFORM = "beam:transform:generic_composite:v1"
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index eefbc85f542..468d86df532 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1730,6 +1730,11 @@ def __init__(self, value):
       value = value.items()
     self.value = tuple(value)
 
+  def to_runner_api_parameter(self, context):
+    # Required as this is identified by type in PTransformOverrides.
+    # TODO(BEAM-3812): Use an actual URN here.
+    return self.to_runner_api_pickled(context)
+
   def infer_output_type(self, unused_input_type):
     if not self.value:
       return Any
diff --git a/sdks/python/apache_beam/transforms/ptransform.py 
b/sdks/python/apache_beam/transforms/ptransform.py
index 9d9fe149497..889372f9266 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -45,7 +45,7 @@ class and wrapper class that allows lambda functions to be 
used as
 import threading
 from functools import reduce
 
-from google.protobuf import wrappers_pb2
+from google.protobuf import message
 
 from apache_beam import error
 from apache_beam import pvalue
@@ -537,13 +537,17 @@ def register(constructor):
       # Used as a decorator.
       return register
 
-  def to_runner_api(self, context):
+  def to_runner_api(self, context, has_parts=False):
     from apache_beam.portability.api import beam_runner_api_pb2
     urn, typed_param = self.to_runner_api_parameter(context)
+    if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts:
+      # TODO(BEAM-3812): Remove this fallback.
+      urn, typed_param = self.to_runner_api_pickled(context)
     return beam_runner_api_pb2.FunctionSpec(
         urn=urn,
         payload=typed_param.SerializeToString()
-        if typed_param is not None else None)
+        if isinstance(typed_param, message.Message)
+        else typed_param)
 
   @classmethod
   def from_runner_api(cls, proto, context):
@@ -554,19 +558,26 @@ def from_runner_api(cls, proto, context):
         proto_utils.parse_Bytes(proto.payload, parameter_type),
         context)
 
-  def to_runner_api_parameter(self, context):
+  def to_runner_api_parameter(self, unused_context):
+    # The payload here is just to ease debugging.
+    return (python_urns.GENERIC_COMPOSITE_TRANSFORM,
+            getattr(self, '_fn_api_payload', str(self)))
+
+  def to_runner_api_pickled(self, unused_context):
     return (python_urns.PICKLED_TRANSFORM,
-            wrappers_pb2.BytesValue(value=pickler.dumps(self)))
+            pickler.dumps(self))
+
 
-  @staticmethod
-  def from_runner_api_parameter(spec_parameter, unused_context):
-    return pickler.loads(spec_parameter.value)
[email protected]_urn(python_urns.GENERIC_COMPOSITE_TRANSFORM, None)
+def _create_transform(payload, unused_context):
+  empty_transform = PTransform()
+  empty_transform._fn_api_payload = payload
+  return empty_transform
 
 
-PTransform.register_urn(
-    python_urns.PICKLED_TRANSFORM,
-    wrappers_pb2.BytesValue,
-    PTransform.from_runner_api_parameter)
[email protected]_urn(python_urns.PICKLED_TRANSFORM, None)
+def _unpickle_transform(pickled_bytes, unused_context):
+  return pickler.loads(pickled_bytes)
 
 
 class _ChainedPTransform(PTransform):
diff --git a/sdks/python/apache_beam/utils/urns.py 
b/sdks/python/apache_beam/utils/urns.py
index e62fbcd0948..a2f040f881a 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -20,6 +20,7 @@
 import abc
 import inspect
 
+from google.protobuf import message
 from google.protobuf import wrappers_pb2
 
 from apache_beam.internal import pickler
@@ -96,7 +97,8 @@ def to_runner_api(self, context):
         spec=beam_runner_api_pb2.FunctionSpec(
             urn=urn,
             payload=typed_param.SerializeToString()
-            if typed_param is not None else None))
+            if isinstance(typed_param, message.Message)
+            else typed_param))
 
   @classmethod
   def from_runner_api(cls, fn_proto, context):


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 92995)
    Time Spent: 40m  (was: 0.5h)

> Avoid pickling PTransforms in proto representation
> --------------------------------------------------
>
>                 Key: BEAM-3812
>                 URL: https://issues.apache.org/jira/browse/BEAM-3812
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Robert Bradshaw
>            Assignee: Ahmet Altay
>            Priority: Minor
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Any transform that requires passing information through the runner protos 
> should have an explicit urn and payload. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to