Code cleanup now that all runners support windowed side inputs.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6cb2f37e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6cb2f37e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6cb2f37e Branch: refs/heads/master Commit: 6cb2f37efadfb52138b125fcaf51e703c2c5fd5a Parents: deb2aea Author: Robert Bradshaw <[email protected]> Authored: Sat Jan 21 21:13:36 2017 -0800 Committer: Robert Bradshaw <[email protected]> Committed: Mon Jan 23 14:37:45 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/common.py | 34 ++++++++++++++------------ 1 file changed, 19 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6cb2f37e/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 0f63cbc..9c8fdfc 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -24,7 +24,6 @@ import sys from apache_beam.internal import util from apache_beam.pvalue import SideOutputValue from apache_beam.transforms import core -from apache_beam.transforms import sideinputs from apache_beam.transforms import window from apache_beam.transforms.window import TimestampedValue from apache_beam.transforms.window import WindowFn @@ -71,6 +70,21 @@ class DoFnRunner(Receiver): # Preferred alternative to context # TODO(robertwb): Remove once all runners are updated. state=None): + """Initializes a DoFnRunner. + + Args: + fn: user DoFn to invoke + args: positional side input arguments (static and placeholder), if any + kwargs: keyword side input arguments (static and placeholder), if any + side_inputs: list of sideinput.SideInputMaps for deferred side inputs + windowing: windowing properties of the output PCollection(s) + context: a DoFnContext to use (deprecated) + tagged_receivers: a dict of tag name to Receiver objects + logger: a logging module (deprecated) + step_name: the name of this step + logging_context: a LoggingContext object + state: handle for accessing DoFn state + """ self.step_name = step_name self.window_fn = windowing.windowfn self.tagged_receivers = tagged_receivers @@ -97,14 +111,10 @@ class DoFnRunner(Receiver): if isinstance(fn, core.NewDoFn): self.is_new_dofn = True - # SideInputs - self.side_inputs = [side_input - if isinstance(side_input, sideinputs.SideInputMap) - else {global_window: side_input} - for side_input in side_inputs] + # Stash values for use in new_dofn_process. + self.side_inputs = side_inputs self.has_windowed_side_inputs = not all( - isinstance(si, dict) or si.is_globally_windowed() - for si in self.side_inputs) + si.is_globally_windowed() for si in self.side_inputs) self.args = args if args else [] self.kwargs = kwargs if kwargs else {} @@ -117,14 +127,8 @@ class DoFnRunner(Receiver): self.dofn = fn self.dofn_process = fn.process else: - # TODO(robertwb): Remove when all runners pass side input maps. - side_inputs = [side_input - if isinstance(side_input, sideinputs.SideInputMap) - else {global_window: side_input} - for side_input in side_inputs] if side_inputs and all( - isinstance(side_input, dict) or side_input.is_globally_windowed() - for side_input in side_inputs): + side_input.is_globally_windowed() for side_input in side_inputs): args, kwargs = util.insert_values_in_args( args, kwargs, [side_input[global_window] for side_input in side_inputs])
