Translate WindowInto through the Runner API.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a95fc199
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a95fc199
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a95fc199

Branch: refs/heads/master
Commit: a95fc199fb7daa7a5e7dd2be7d1eda11748b0e6b
Parents: d096b19
Author: Robert Bradshaw <[email protected]>
Authored: Tue Apr 18 16:07:32 2017 -0700
Committer: Robert Bradshaw <[email protected]>
Committed: Wed Apr 26 18:14:13 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/core.py | 35 ++++++++++++++++++++-----
 sdks/python/apache_beam/utils/urns.py      |  1 +
 2 files changed, 30 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a95fc199/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 14cc620..64911d6 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -590,6 +590,9 @@ class ParDo(PTransformWithSideInputs):
 
   def __init__(self, fn, *args, **kwargs):
     super(ParDo, self).__init__(fn, *args, **kwargs)
+    # TODO(robertwb): Change all uses of the dofn attribute to use fn instead.
+    self.dofn = self.fn
+    self.output_tags = set()
 
     if not isinstance(self.fn, DoFn):
       raise TypeError('ParDo must be called with a DoFn instance.')
@@ -615,9 +618,6 @@ class ParDo(PTransformWithSideInputs):
             'fn_dd': self.fn}
 
   def expand(self, pcoll):
-    self.output_tags = set()
-    # TODO(robertwb): Change all uses of the dofn attribute to use fn instead.
-    self.dofn = self.fn
     return pvalue.PCollection(pcoll.pipeline)
 
   def with_outputs(self, *tags, **main_kw):
@@ -1268,7 +1268,7 @@ class WindowInto(ParDo):
       new_windows = self.windowing.windowfn.assign(context)
       yield WindowedValue(element, context.timestamp, new_windows)
 
-  def __init__(self, windowfn, *args, **kwargs):
+  def __init__(self, windowfn, **kwargs):
     """Initializes a WindowInto transform.
 
     Args:
@@ -1279,8 +1279,7 @@ class WindowInto(ParDo):
     output_time_fn = kwargs.pop('output_time_fn', None)
     self.windowing = Windowing(windowfn, triggerfn, accumulation_mode,
                                output_time_fn)
-    dofn = self.WindowIntoFn(self.windowing)
-    super(WindowInto, self).__init__(dofn)
+    super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing))
 
   def get_windowing(self, unused_inputs):
     return self.windowing
@@ -1297,6 +1296,30 @@ class WindowInto(ParDo):
       self.with_output_types(output_type)
     return super(WindowInto, self).expand(pcoll)
 
+  def to_runner_api_parameter(self, context):
+    return (
+        urns.WINDOW_INTO_TRANSFORM,
+        self.windowing.to_runner_api(context))
+
+  @staticmethod
+  def from_runner_api_parameter(proto, context):
+    windowing = Windowing.from_runner_api(proto, context)
+    return WindowInto(
+        windowing.windowfn,
+        trigger=windowing.triggerfn,
+        accumulation_mode=windowing.accumulation_mode,
+        output_time_fn=windowing.output_time_fn)
+
+
+PTransform.register_urn(
+    urns.WINDOW_INTO_TRANSFORM,
+    # TODO(robertwb): Update WindowIntoPayload to include the full strategy.
+    # (Right now only WindowFn is used, but we need this to reconstitute the
+    # WindowInto transform, and in the future will need it at runtime to
+    # support meta-data driven triggers.)
+    beam_runner_api_pb2.WindowingStrategy,
+    WindowInto.from_runner_api_parameter)
+
 
 # Python's pickling is broken for nested classes.
 WindowIntoFn = WindowInto.WindowIntoFn

http://git-wip-us.apache.org/repos/asf/beam/blob/a95fc199/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py 
b/sdks/python/apache_beam/utils/urns.py
index d10dd26..a2f3a3e 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -25,3 +25,4 @@ PICKLED_CODER = "beam:coder:pickled_python:v0.1"
 
 PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
 FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1"
+WINDOW_INTO_TRANSFORM = "beam:ptransform:window_into:v0.1"

Reply via email to