Factor out URN registration logic.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/87a475e4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/87a475e4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/87a475e4 Branch: refs/heads/master Commit: 87a475e40a346d104e5b30e9e2e3f60b9e56916b Parents: a95fc19 Author: Robert Bradshaw <[email protected]> Authored: Wed Apr 19 11:56:55 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Wed Apr 26 18:14:13 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/transforms/window.py | 90 +++++----------------- sdks/python/apache_beam/utils/urns.py | 91 +++++++++++++++++++++++ 2 files changed, 108 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/87a475e4/sdks/python/apache_beam/transforms/window.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 7e56c23..9c4b109 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -52,10 +52,8 @@ from __future__ import absolute_import import abc from google.protobuf import struct_pb2 -from google.protobuf import wrappers_pb2 from apache_beam import coders -from apache_beam.internal import pickler from apache_beam.runners.api import beam_runner_api_pb2 from apache_beam.transforms import timeutil from apache_beam.transforms.timeutil import Duration @@ -92,7 +90,7 @@ class OutputTimeFn(object): raise ValueError('Invalid OutputTimeFn: %s.' % output_time_fn) -class WindowFn(object): +class WindowFn(urns.RunnerApiFn): """An abstract windowing function defining a basic assign and merge.""" __metaclass__ = abc.ABCMeta @@ -150,39 +148,7 @@ class WindowFn(object): # By default, just return the input timestamp. return input_timestamp - _known_urns = {} - - @classmethod - def register_urn(cls, urn, parameter_type, constructor): - cls._known_urns[urn] = parameter_type, constructor - - @classmethod - def from_runner_api(cls, fn_proto, context): - parameter_type, constructor = cls._known_urns[fn_proto.spec.urn] - return constructor( - proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type), - context) - - def to_runner_api(self, context): - urn, typed_param = self.to_runner_api_parameter(context) - return beam_runner_api_pb2.SdkFunctionSpec( - spec=beam_runner_api_pb2.FunctionSpec( - urn=urn, - parameter=proto_utils.pack_Any(typed_param))) - - @staticmethod - def from_runner_api_parameter(fn_parameter, unused_context): - return pickler.loads(fn_parameter.value) - - def to_runner_api_parameter(self, context): - return (urns.PICKLED_WINDOW_FN, - wrappers_pb2.BytesValue(value=pickler.dumps(self))) - - -WindowFn.register_urn( - urns.PICKLED_WINDOW_FN, - wrappers_pb2.BytesValue, - WindowFn.from_runner_api_parameter) + urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_WINDOW_FN) class BoundedWindow(object): @@ -315,16 +281,12 @@ class GlobalWindows(NonMergingWindowFn): def __ne__(self, other): return not self == other - @staticmethod - def from_runner_api_parameter(unused_fn_parameter, unused_context): - return GlobalWindows() - def to_runner_api_parameter(self, context): return urns.GLOBAL_WINDOWS_FN, None - -WindowFn.register_urn( - urns.GLOBAL_WINDOWS_FN, None, GlobalWindows.from_runner_api_parameter) + @urns.RunnerApiFn.register_urn(urns.GLOBAL_WINDOWS_FN, None) + def from_runner_api_parameter(unused_fn_parameter, unused_context): + return GlobalWindows() class FixedWindows(NonMergingWindowFn): @@ -362,22 +324,16 @@ class FixedWindows(NonMergingWindowFn): def __ne__(self, other): return not self == other - @staticmethod - def from_runner_api_parameter(fn_parameter, unused_context): - return FixedWindows( - size=Duration(micros=fn_parameter['size']), - offset=Timestamp(micros=fn_parameter['offset'])) - def to_runner_api_parameter(self, context): return (urns.FIXED_WINDOWS_FN, proto_utils.pack_Struct(size=self.size.micros, offset=self.offset.micros)) - -WindowFn.register_urn( - urns.FIXED_WINDOWS_FN, - struct_pb2.Struct, - FixedWindows.from_runner_api_parameter) + @urns.RunnerApiFn.register_urn(urns.FIXED_WINDOWS_FN, struct_pb2.Struct) + def from_runner_api_parameter(fn_parameter, unused_context): + return FixedWindows( + size=Duration(micros=fn_parameter['size']), + offset=Timestamp(micros=fn_parameter['offset'])) class SlidingWindows(NonMergingWindowFn): @@ -419,13 +375,6 @@ class SlidingWindows(NonMergingWindowFn): and self.offset == other.offset and self.period == other.period) - @staticmethod - def from_runner_api_parameter(fn_parameter, unused_context): - return SlidingWindows( - size=Duration(micros=fn_parameter['size']), - offset=Timestamp(micros=fn_parameter['offset']), - period=Duration(micros=fn_parameter['period'])) - def to_runner_api_parameter(self, context): return (urns.SLIDING_WINDOWS_FN, proto_utils.pack_Struct( @@ -433,11 +382,12 @@ class SlidingWindows(NonMergingWindowFn): offset=self.offset.micros, period=self.period.micros)) - -WindowFn.register_urn( - urns.SLIDING_WINDOWS_FN, - struct_pb2.Struct, - SlidingWindows.from_runner_api_parameter) + @urns.RunnerApiFn.register_urn(urns.SLIDING_WINDOWS_FN, struct_pb2.Struct) + def from_runner_api_parameter(fn_parameter, unused_context): + return SlidingWindows( + size=Duration(micros=fn_parameter['size']), + offset=Timestamp(micros=fn_parameter['offset']), + period=Duration(micros=fn_parameter['period'])) class Sessions(WindowFn): @@ -487,16 +437,10 @@ class Sessions(WindowFn): if type(self) == type(other) == Sessions: return self.gap_size == other.gap_size - @staticmethod + @urns.RunnerApiFn.register_urn(urns.SESSION_WINDOWS_FN, struct_pb2.Struct) def from_runner_api_parameter(fn_parameter, unused_context): return Sessions(gap_size=Duration(micros=fn_parameter['gap_size'])) def to_runner_api_parameter(self, context): return (urns.SESSION_WINDOWS_FN, proto_utils.pack_Struct(gap_size=self.gap_size.micros)) - - -WindowFn.register_urn( - urns.SESSION_WINDOWS_FN, - struct_pb2.Struct, - Sessions.from_runner_api_parameter) http://git-wip-us.apache.org/repos/asf/beam/blob/87a475e4/sdks/python/apache_beam/utils/urns.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py index a2f3a3e..46bd8f5 100644 --- a/sdks/python/apache_beam/utils/urns.py +++ b/sdks/python/apache_beam/utils/urns.py @@ -15,6 +15,15 @@ # limitations under the License. # +import abc +import inspect + +from google.protobuf import wrappers_pb2 + +from apache_beam.internal import pickler +from apache_beam.utils import proto_utils + + PICKLED_WINDOW_FN = "beam:window_fn:pickled_python:v0.1" GLOBAL_WINDOWS_FN = "beam:window_fn:global_windows:v0.1" FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1" @@ -26,3 +35,85 @@ PICKLED_CODER = "beam:coder:pickled_python:v0.1" PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1" FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1" WINDOW_INTO_TRANSFORM = "beam:ptransform:window_into:v0.1" + + +class RunnerApiFn(object): + """Abstract base class that provides urn registration utilities. + + A class that inherits from this class will get a registration-based + from_runner_api and to_runner_api method that convert to and from + beam_runner_api_pb2.SdkFunctionSpec. + + Additionally, register_pickle_urn can be called from the body of a class + to register serialization via pickling. + """ + + __metaclass__ = abc.ABCMeta + + _known_urns = {} + + @abc.abstractmethod + def to_runner_api_parameter(self, unused_context): + """Returns the urn and payload for this Fn. + + The returned urn(s) should be registered with `register_urn`. + """ + pass + + @classmethod + def register_urn(cls, urn, parameter_type, fn=None): + """Registeres a urn with a constructor. + + For example, if 'beam:fn:foo' had paramter type FooPayload, one could + write `RunnerApiFn.register_urn('bean:fn:foo', FooPayload, foo_from_proto)` + where foo_from_proto took as arguments a FooPayload and a PipelineContext. + This function can also be used as a decorator rather than passing the + callable in as the final parameter. + + A corresponding to_runner_api_parameter method would be expected that + returns the tuple ('beam:fn:foo', FooPayload) + """ + def register(fn): + cls._known_urns[urn] = parameter_type, fn + return staticmethod(fn) + if fn: + # Used as a statement. + register(fn) + else: + # Used as a decorator. + return register + + @classmethod + def register_pickle_urn(cls, pickle_urn): + """Registers and implements the given urn via pickling. + """ + inspect.currentframe().f_back.f_locals['to_runner_api_parameter'] = ( + lambda self, context: ( + pickle_urn, wrappers_pb2.BytesValue(value=pickler.dumps(self)))) + cls.register_urn( + pickle_urn, + wrappers_pb2.BytesValue, + lambda proto, unused_context: pickler.loads(proto.value)) + + def to_runner_api(self, context): + """Returns an SdkFunctionSpec encoding this Fn. + + Prefer overriding self.to_runner_api_parameter. + """ + from apache_beam.runners.api import beam_runner_api_pb2 + urn, typed_param = self.to_runner_api_parameter(context) + return beam_runner_api_pb2.SdkFunctionSpec( + spec=beam_runner_api_pb2.FunctionSpec( + urn=urn, + parameter=proto_utils.pack_Any(typed_param))) + + @classmethod + def from_runner_api(cls, fn_proto, context): + """Converts from an SdkFunctionSpec to a Fn object. + + Prefer registering a urn with its parameter type and constructor. + """ + parameter_type, constructor = cls._known_urns[fn_proto.spec.urn] + return constructor( + proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type), + context)
