Per-transform runner api dispatch.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97c9b174 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97c9b174 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97c9b174 Branch: refs/heads/master Commit: 97c9b174d027e58ac5202cc1eedeaec59b57023a Parents: 26c61f4 Author: Robert Bradshaw <[email protected]> Authored: Tue Apr 18 15:29:04 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Wed Apr 26 18:14:12 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/pipeline.py | 18 ++++----- .../python/apache_beam/transforms/ptransform.py | 40 ++++++++++++++++++++ 2 files changed, 48 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/97c9b174/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 0f4c8db..100c50a 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -52,7 +52,6 @@ import os import shutil import tempfile -from google.protobuf import wrappers_pb2 from apache_beam import pvalue from apache_beam import typehints from apache_beam.internal import pickler @@ -60,8 +59,6 @@ from apache_beam.runners import create_runner from apache_beam.runners import PipelineRunner from apache_beam.transforms import ptransform from apache_beam.typehints import TypeCheckError -from apache_beam.utils import proto_utils -from apache_beam.utils import urns from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions from apache_beam.utils.pipeline_options import StandardOptions @@ -514,12 +511,15 @@ class AppliedPTransform(object): def to_runner_api(self, context): from apache_beam.runners.api import beam_runner_api_pb2 + + def transform_to_runner_api(transform, context): + if transform is None: + return None + else: + return transform.to_runner_api(context) return beam_runner_api_pb2.PTransform( unique_name=self.full_label, - spec=beam_runner_api_pb2.FunctionSpec( - urn=urns.PICKLED_TRANSFORM, - parameter=proto_utils.pack_Any( - wrappers_pb2.BytesValue(value=pickler.dumps(self.transform)))), + spec=transform_to_runner_api(self.transform, context), subtransforms=[context.transforms.get_id(part) for part in self.parts], # TODO(BEAM-115): Side inputs. inputs={tag: context.pcollections.get_id(pc) @@ -533,9 +533,7 @@ class AppliedPTransform(object): def from_runner_api(proto, context): result = AppliedPTransform( parent=None, - transform=pickler.loads( - proto_utils.unpack_Any(proto.spec.parameter, - wrappers_pb2.BytesValue).value), + transform=ptransform.PTransform.from_runner_api(proto.spec, context), full_label=proto.unique_name, inputs=[ context.pcollections.get_by_id(id) for id in proto.inputs.values()]) http://git-wip-us.apache.org/repos/asf/beam/blob/97c9b174/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index e2c4428..706b003 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -42,6 +42,8 @@ import operator import os import sys +from google.protobuf import wrappers_pb2 + from apache_beam import error from apache_beam import pvalue from apache_beam import typehints @@ -54,6 +56,8 @@ from apache_beam.typehints import TypeCheckError from apache_beam.typehints import validate_composite_type_param from apache_beam.typehints import WithTypeHints from apache_beam.typehints.trivial_inference import instance_to_type +from apache_beam.utils import proto_utils +from apache_beam.utils import urns class _PValueishTransform(object): @@ -412,6 +416,42 @@ class PTransform(WithTypeHints, HasDisplayData): yield pvalueish return pvalueish, tuple(_dict_tuple_leaves(pvalueish)) + _known_urns = {} + + @classmethod + def register_urn(cls, urn, parameter_type, constructor): + cls._known_urns[urn] = parameter_type, constructor + + def to_runner_api(self, context): + from apache_beam.runners.api import beam_runner_api_pb2 + urn, typed_param = self.to_runner_api_parameter(context) + return beam_runner_api_pb2.FunctionSpec( + urn=urn, + parameter=proto_utils.pack_Any(typed_param)) + + @classmethod + def from_runner_api(cls, proto, context): + if proto is None or not proto.urn: + return None + parameter_type, constructor = cls._known_urns[proto.urn] + return constructor( + proto_utils.unpack_Any(proto.parameter, parameter_type), + context) + + def to_runner_api_parameter(self, context): + return (urns.PICKLED_TRANSFORM, + wrappers_pb2.BytesValue(value=pickler.dumps(self))) + + @staticmethod + def from_runner_api_parameter(spec_parameter, unused_context): + return pickler.loads(spec_parameter.value) + + +PTransform.register_urn( + urns.PICKLED_TRANSFORM, + wrappers_pb2.BytesValue, + PTransform.from_runner_api_parameter) + class ChainedPTransform(PTransform):
