Factor out URN registration logic.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/87a475e4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/87a475e4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/87a475e4

Branch: refs/heads/master
Commit: 87a475e40a346d104e5b30e9e2e3f60b9e56916b
Parents: a95fc19
Author: Robert Bradshaw <[email protected]>
Authored: Wed Apr 19 11:56:55 2017 -0700
Committer: Robert Bradshaw <[email protected]>
Committed: Wed Apr 26 18:14:13 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/window.py | 90 +++++-----------------
 sdks/python/apache_beam/utils/urns.py        | 91 +++++++++++++++++++++++
 2 files changed, 108 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/87a475e4/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py 
b/sdks/python/apache_beam/transforms/window.py
index 7e56c23..9c4b109 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -52,10 +52,8 @@ from __future__ import absolute_import
 import abc
 
 from google.protobuf import struct_pb2
-from google.protobuf import wrappers_pb2
 
 from apache_beam import coders
-from apache_beam.internal import pickler
 from apache_beam.runners.api import beam_runner_api_pb2
 from apache_beam.transforms import timeutil
 from apache_beam.transforms.timeutil import Duration
@@ -92,7 +90,7 @@ class OutputTimeFn(object):
       raise ValueError('Invalid OutputTimeFn: %s.' % output_time_fn)
 
 
-class WindowFn(object):
+class WindowFn(urns.RunnerApiFn):
   """An abstract windowing function defining a basic assign and merge."""
 
   __metaclass__ = abc.ABCMeta
@@ -150,39 +148,7 @@ class WindowFn(object):
     # By default, just return the input timestamp.
     return input_timestamp
 
-  _known_urns = {}
-
-  @classmethod
-  def register_urn(cls, urn, parameter_type, constructor):
-    cls._known_urns[urn] = parameter_type, constructor
-
-  @classmethod
-  def from_runner_api(cls, fn_proto, context):
-    parameter_type, constructor = cls._known_urns[fn_proto.spec.urn]
-    return constructor(
-        proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type),
-        context)
-
-  def to_runner_api(self, context):
-    urn, typed_param = self.to_runner_api_parameter(context)
-    return beam_runner_api_pb2.SdkFunctionSpec(
-        spec=beam_runner_api_pb2.FunctionSpec(
-            urn=urn,
-            parameter=proto_utils.pack_Any(typed_param)))
-
-  @staticmethod
-  def from_runner_api_parameter(fn_parameter, unused_context):
-    return pickler.loads(fn_parameter.value)
-
-  def to_runner_api_parameter(self, context):
-    return (urns.PICKLED_WINDOW_FN,
-            wrappers_pb2.BytesValue(value=pickler.dumps(self)))
-
-
-WindowFn.register_urn(
-    urns.PICKLED_WINDOW_FN,
-    wrappers_pb2.BytesValue,
-    WindowFn.from_runner_api_parameter)
+  urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_WINDOW_FN)
 
 
 class BoundedWindow(object):
@@ -315,16 +281,12 @@ class GlobalWindows(NonMergingWindowFn):
   def __ne__(self, other):
     return not self == other
 
-  @staticmethod
-  def from_runner_api_parameter(unused_fn_parameter, unused_context):
-    return GlobalWindows()
-
   def to_runner_api_parameter(self, context):
     return urns.GLOBAL_WINDOWS_FN, None
 
-
-WindowFn.register_urn(
-    urns.GLOBAL_WINDOWS_FN, None, GlobalWindows.from_runner_api_parameter)
+  @urns.RunnerApiFn.register_urn(urns.GLOBAL_WINDOWS_FN, None)
+  def from_runner_api_parameter(unused_fn_parameter, unused_context):
+    return GlobalWindows()
 
 
 class FixedWindows(NonMergingWindowFn):
@@ -362,22 +324,16 @@ class FixedWindows(NonMergingWindowFn):
   def __ne__(self, other):
     return not self == other
 
-  @staticmethod
-  def from_runner_api_parameter(fn_parameter, unused_context):
-    return FixedWindows(
-        size=Duration(micros=fn_parameter['size']),
-        offset=Timestamp(micros=fn_parameter['offset']))
-
   def to_runner_api_parameter(self, context):
     return (urns.FIXED_WINDOWS_FN,
             proto_utils.pack_Struct(size=self.size.micros,
                                     offset=self.offset.micros))
 
-
-WindowFn.register_urn(
-    urns.FIXED_WINDOWS_FN,
-    struct_pb2.Struct,
-    FixedWindows.from_runner_api_parameter)
+  @urns.RunnerApiFn.register_urn(urns.FIXED_WINDOWS_FN, struct_pb2.Struct)
+  def from_runner_api_parameter(fn_parameter, unused_context):
+    return FixedWindows(
+        size=Duration(micros=fn_parameter['size']),
+        offset=Timestamp(micros=fn_parameter['offset']))
 
 
 class SlidingWindows(NonMergingWindowFn):
@@ -419,13 +375,6 @@ class SlidingWindows(NonMergingWindowFn):
               and self.offset == other.offset
               and self.period == other.period)
 
-  @staticmethod
-  def from_runner_api_parameter(fn_parameter, unused_context):
-    return SlidingWindows(
-        size=Duration(micros=fn_parameter['size']),
-        offset=Timestamp(micros=fn_parameter['offset']),
-        period=Duration(micros=fn_parameter['period']))
-
   def to_runner_api_parameter(self, context):
     return (urns.SLIDING_WINDOWS_FN,
             proto_utils.pack_Struct(
@@ -433,11 +382,12 @@ class SlidingWindows(NonMergingWindowFn):
                 offset=self.offset.micros,
                 period=self.period.micros))
 
-
-WindowFn.register_urn(
-    urns.SLIDING_WINDOWS_FN,
-    struct_pb2.Struct,
-    SlidingWindows.from_runner_api_parameter)
+  @urns.RunnerApiFn.register_urn(urns.SLIDING_WINDOWS_FN, struct_pb2.Struct)
+  def from_runner_api_parameter(fn_parameter, unused_context):
+    return SlidingWindows(
+        size=Duration(micros=fn_parameter['size']),
+        offset=Timestamp(micros=fn_parameter['offset']),
+        period=Duration(micros=fn_parameter['period']))
 
 
 class Sessions(WindowFn):
@@ -487,16 +437,10 @@ class Sessions(WindowFn):
     if type(self) == type(other) == Sessions:
       return self.gap_size == other.gap_size
 
-  @staticmethod
+  @urns.RunnerApiFn.register_urn(urns.SESSION_WINDOWS_FN, struct_pb2.Struct)
   def from_runner_api_parameter(fn_parameter, unused_context):
     return Sessions(gap_size=Duration(micros=fn_parameter['gap_size']))
 
   def to_runner_api_parameter(self, context):
     return (urns.SESSION_WINDOWS_FN,
             proto_utils.pack_Struct(gap_size=self.gap_size.micros))
-
-
-WindowFn.register_urn(
-    urns.SESSION_WINDOWS_FN,
-    struct_pb2.Struct,
-    Sessions.from_runner_api_parameter)

http://git-wip-us.apache.org/repos/asf/beam/blob/87a475e4/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py 
b/sdks/python/apache_beam/utils/urns.py
index a2f3a3e..46bd8f5 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -15,6 +15,15 @@
 # limitations under the License.
 #
 
+import abc
+import inspect
+
+from google.protobuf import wrappers_pb2
+
+from apache_beam.internal import pickler
+from apache_beam.utils import proto_utils
+
+
 PICKLED_WINDOW_FN = "beam:window_fn:pickled_python:v0.1"
 GLOBAL_WINDOWS_FN = "beam:window_fn:global_windows:v0.1"
 FIXED_WINDOWS_FN = "beam:window_fn:fixed_windows:v0.1"
@@ -26,3 +35,85 @@ PICKLED_CODER = "beam:coder:pickled_python:v0.1"
 PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
 FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1"
 WINDOW_INTO_TRANSFORM = "beam:ptransform:window_into:v0.1"
+
+
+class RunnerApiFn(object):
+  """Abstract base class that provides urn registration utilities.
+
+  A class that inherits from this class will get a registration-based
+  from_runner_api and to_runner_api method that convert to and from
+  beam_runner_api_pb2.SdkFunctionSpec.
+
+  Additionally, register_pickle_urn can be called from the body of a class
+  to register serialization via pickling.
+  """
+
+  __metaclass__ = abc.ABCMeta
+
+  _known_urns = {}
+
+  @abc.abstractmethod
+  def to_runner_api_parameter(self, unused_context):
+    """Returns the urn and payload for this Fn.
+
+    The returned urn(s) should be registered with `register_urn`.
+    """
+    pass
+
+  @classmethod
+  def register_urn(cls, urn, parameter_type, fn=None):
+    """Registeres a urn with a constructor.
+
+    For example, if 'beam:fn:foo' had paramter type FooPayload, one could
+    write `RunnerApiFn.register_urn('bean:fn:foo', FooPayload, foo_from_proto)`
+    where foo_from_proto took as arguments a FooPayload and a PipelineContext.
+    This function can also be used as a decorator rather than passing the
+    callable in as the final parameter.
+
+    A corresponding to_runner_api_parameter method would be expected that
+    returns the tuple ('beam:fn:foo', FooPayload)
+    """
+    def register(fn):
+      cls._known_urns[urn] = parameter_type, fn
+      return staticmethod(fn)
+    if fn:
+      # Used as a statement.
+      register(fn)
+    else:
+      # Used as a decorator.
+      return register
+
+  @classmethod
+  def register_pickle_urn(cls, pickle_urn):
+    """Registers and implements the given urn via pickling.
+    """
+    inspect.currentframe().f_back.f_locals['to_runner_api_parameter'] = (
+        lambda self, context: (
+            pickle_urn, wrappers_pb2.BytesValue(value=pickler.dumps(self))))
+    cls.register_urn(
+        pickle_urn,
+        wrappers_pb2.BytesValue,
+        lambda proto, unused_context: pickler.loads(proto.value))
+
+  def to_runner_api(self, context):
+    """Returns an SdkFunctionSpec encoding this Fn.
+
+    Prefer overriding self.to_runner_api_parameter.
+    """
+    from apache_beam.runners.api import beam_runner_api_pb2
+    urn, typed_param = self.to_runner_api_parameter(context)
+    return beam_runner_api_pb2.SdkFunctionSpec(
+        spec=beam_runner_api_pb2.FunctionSpec(
+            urn=urn,
+            parameter=proto_utils.pack_Any(typed_param)))
+
+  @classmethod
+  def from_runner_api(cls, fn_proto, context):
+    """Converts from an SdkFunctionSpec to a Fn object.
+
+    Prefer registering a urn with its parameter type and constructor.
+    """
+    parameter_type, constructor = cls._known_urns[fn_proto.spec.urn]
+    return constructor(
+        proto_utils.unpack_Any(fn_proto.spec.parameter, parameter_type),
+        context)

Reply via email to