Translate WindowInto through the Runner API.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a95fc199 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a95fc199 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a95fc199 Branch: refs/heads/master Commit: a95fc199fb7daa7a5e7dd2be7d1eda11748b0e6b Parents: d096b19 Author: Robert Bradshaw <[email protected]> Authored: Tue Apr 18 16:07:32 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Wed Apr 26 18:14:13 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/transforms/core.py | 35 ++++++++++++++++++++----- sdks/python/apache_beam/utils/urns.py | 1 + 2 files changed, 30 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a95fc199/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 14cc620..64911d6 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -590,6 +590,9 @@ class ParDo(PTransformWithSideInputs): def __init__(self, fn, *args, **kwargs): super(ParDo, self).__init__(fn, *args, **kwargs) + # TODO(robertwb): Change all uses of the dofn attribute to use fn instead. + self.dofn = self.fn + self.output_tags = set() if not isinstance(self.fn, DoFn): raise TypeError('ParDo must be called with a DoFn instance.') @@ -615,9 +618,6 @@ class ParDo(PTransformWithSideInputs): 'fn_dd': self.fn} def expand(self, pcoll): - self.output_tags = set() - # TODO(robertwb): Change all uses of the dofn attribute to use fn instead. - self.dofn = self.fn return pvalue.PCollection(pcoll.pipeline) def with_outputs(self, *tags, **main_kw): @@ -1268,7 +1268,7 @@ class WindowInto(ParDo): new_windows = self.windowing.windowfn.assign(context) yield WindowedValue(element, context.timestamp, new_windows) - def __init__(self, windowfn, *args, **kwargs): + def __init__(self, windowfn, **kwargs): """Initializes a WindowInto transform. Args: @@ -1279,8 +1279,7 @@ class WindowInto(ParDo): output_time_fn = kwargs.pop('output_time_fn', None) self.windowing = Windowing(windowfn, triggerfn, accumulation_mode, output_time_fn) - dofn = self.WindowIntoFn(self.windowing) - super(WindowInto, self).__init__(dofn) + super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing)) def get_windowing(self, unused_inputs): return self.windowing @@ -1297,6 +1296,30 @@ class WindowInto(ParDo): self.with_output_types(output_type) return super(WindowInto, self).expand(pcoll) + def to_runner_api_parameter(self, context): + return ( + urns.WINDOW_INTO_TRANSFORM, + self.windowing.to_runner_api(context)) + + @staticmethod + def from_runner_api_parameter(proto, context): + windowing = Windowing.from_runner_api(proto, context) + return WindowInto( + windowing.windowfn, + trigger=windowing.triggerfn, + accumulation_mode=windowing.accumulation_mode, + output_time_fn=windowing.output_time_fn) + + +PTransform.register_urn( + urns.WINDOW_INTO_TRANSFORM, + # TODO(robertwb): Update WindowIntoPayload to include the full strategy. + # (Right now only WindowFn is used, but we need this to reconstitute the + # WindowInto transform, and in the future will need it at runtime to + # support meta-data driven triggers.) + beam_runner_api_pb2.WindowingStrategy, + WindowInto.from_runner_api_parameter) + # Python's pickling is broken for nested classes. WindowIntoFn = WindowInto.WindowIntoFn http://git-wip-us.apache.org/repos/asf/beam/blob/a95fc199/sdks/python/apache_beam/utils/urns.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py index d10dd26..a2f3a3e 100644 --- a/sdks/python/apache_beam/utils/urns.py +++ b/sdks/python/apache_beam/utils/urns.py @@ -25,3 +25,4 @@ PICKLED_CODER = "beam:coder:pickled_python:v0.1" PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1" FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1" +WINDOW_INTO_TRANSFORM = "beam:ptransform:window_into:v0.1"
