Repository: beam Updated Branches: refs/heads/master 70b16c74d -> 1a6f2e8f6
Some performance improvements to NewDoFn Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/956b81cf Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/956b81cf Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/956b81cf Branch: refs/heads/master Commit: 956b81cfa18281366c2f6bc41b02b099ec37d210 Parents: 70b16c7 Author: Sourabh Bajaj <[email protected]> Authored: Mon Jan 30 11:16:16 2017 -0800 Committer: Ahmet Altay <[email protected]> Committed: Thu Feb 2 13:36:58 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/common.pxd | 9 +- sdks/python/apache_beam/runners/common.py | 157 +++++++++++++++--------- 2 files changed, 107 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/956b81cf/sdks/python/apache_beam/runners/common.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index f41b313..dbb08f0 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -38,17 +38,22 @@ cdef class DoFnRunner(Receiver): cdef LoggingContext logging_context cdef object step_name cdef bint is_new_dofn - cdef object args + cdef list args cdef dict kwargs - cdef object side_inputs cdef ScopedMetricsContainer scoped_metrics_container + cdef list side_inputs cdef bint has_windowed_side_inputs + cdef list placeholders + cdef bint simple_process cdef Receiver main_receivers cpdef process(self, WindowedValue element) cdef old_dofn_process(self, WindowedValue element) cdef new_dofn_process(self, WindowedValue element) + cdef new_dofn_simple_process(self, WindowedValue element) + cdef _new_dofn_window_process( + self, WindowedValue element, list args, dict kwargs, object window) @cython.locals(windowed_value=WindowedValue) cpdef _process_outputs(self, WindowedValue element, results) http://git-wip-us.apache.org/repos/asf/beam/blob/956b81cf/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index dbbd9ba..0089f34 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -25,9 +25,9 @@ from apache_beam.internal import util from apache_beam.metrics.execution import ScopedMetricsContainer from apache_beam.pvalue import SideOutputValue from apache_beam.transforms import core -from apache_beam.transforms import window from apache_beam.transforms.window import TimestampedValue from apache_beam.transforms.window import WindowFn +from apache_beam.transforms.window import GlobalWindow from apache_beam.utils.windowed_value import WindowedValue @@ -94,7 +94,7 @@ class DoFnRunner(Receiver): self.scoped_metrics_container = (scoped_metrics_container or ScopedMetricsContainer()) - global_window = window.GlobalWindow() + global_window = GlobalWindow() # Need to support multiple iterations. side_inputs = list(side_inputs) @@ -117,6 +117,11 @@ class DoFnRunner(Receiver): # TODO(Sourabhbajaj): Remove the usage of OldDoFn if isinstance(fn, core.NewDoFn): + + class ArgPlaceholder(object): + def __init__(self, placeholder): + self.placeholder = placeholder + self.is_new_dofn = True # Stash values for use in new_dofn_process. @@ -127,6 +132,70 @@ class DoFnRunner(Receiver): self.args = args if args else [] self.kwargs = kwargs if kwargs else {} self.dofn = fn + self.dofn_process = fn.process + + arguments, _, _, defaults = self.dofn.get_function_arguments('process') + defaults = defaults if defaults else [] + self_in_args = int(self.dofn.is_process_bounded()) + + self.simple_process = ( + not side_inputs and not args and not kwargs and not defaults) + if self.simple_process: + return + + # TODO(Sourabhbajaj) Rename this variable once oldDoFn is deprecated + self.has_windowed_side_inputs = ( + self.has_windowed_side_inputs or + core.NewDoFn.WindowParam in defaults) + + # Try to prepare all the arguments that can just be filled in + # without any additional work. in the process function. + # Also cache all the placeholders needed in the process function. + + # Fill in sideInputs if they are globally windowed + if not self.has_windowed_side_inputs: + self.args, self.kwargs = util.insert_values_in_args( + args, kwargs, [si[global_window] for si in side_inputs]) + + # Create placeholder for element parameter + if core.NewDoFn.ElementParam not in defaults: + args_to_pick = len(arguments) - len(defaults) - 1 - self_in_args + final_args = [ArgPlaceholder(core.NewDoFn.ElementParam)] + \ + self.args[:args_to_pick] + else: + args_to_pick = len(arguments) - len(defaults) - self_in_args + final_args = self.args[:args_to_pick] + + # Fill the OtherPlaceholders for context, window or timestamp + args = iter(self.args[args_to_pick:]) + for a, d in zip(arguments[-len(defaults):], defaults): + if d == core.NewDoFn.ElementParam: + final_args.append(ArgPlaceholder(d)) + elif d == core.NewDoFn.ContextParam: + final_args.append(ArgPlaceholder(d)) + elif d == core.NewDoFn.WindowParam: + final_args.append(ArgPlaceholder(d)) + elif d == core.NewDoFn.TimestampParam: + final_args.append(ArgPlaceholder(d)) + elif d == core.NewDoFn.SideInputParam: + # If no more args are present then the value must be passed via kwarg + try: + final_args.append(args.next()) + except StopIteration: + if a not in self.kwargs: + raise ValueError("Value for sideinput %s not provided" % a) + else: + # If no more args are present then the value must be passed via kwarg + try: + final_args.append(args.next()) + except StopIteration: + pass + final_args.extend(list(args)) + self.args = final_args + + # Stash the list of placeholder positions for performance + self.placeholders = [(i, x.placeholder) for (i, x) in enumerate(self.args) + if isinstance(x, ArgPlaceholder)] else: self.is_new_dofn = False @@ -177,66 +246,37 @@ class DoFnRunner(Receiver): self.context.set_element(element) self._process_outputs(element, self.dofn_process(self.context)) + def new_dofn_simple_process(self, element): + self._process_outputs(element, self.dofn_process(element.value)) + + def _new_dofn_window_process(self, element, args, kwargs, window): + # TODO(sourabhbajaj): Investigate why we can't use `is` instead of == + for i, p in self.placeholders: + if p == core.NewDoFn.ElementParam: + args[i] = element.value + elif p == core.NewDoFn.ContextParam: + args[i] = self.context + elif p == core.NewDoFn.WindowParam: + args[i] = window + elif p == core.NewDoFn.TimestampParam: + args[i] = element.timestamp + if not kwargs: + self._process_outputs(element, self.dofn_process(*args)) + else: + self._process_outputs(element, self.dofn_process(*args, **kwargs)) + def new_dofn_process(self, element): self.context.set_element(element) - arguments, _, _, defaults = self.dofn.get_function_arguments('process') - defaults = defaults if defaults else [] - - self_in_args = int(self.dofn.is_process_bounded()) - # Call for the process function for each window if has windowed side inputs # or if the process accesses the window parameter. We can just call it once # otherwise as none of the arguments are changing - if self.has_windowed_side_inputs or core.NewDoFn.WindowParam in defaults: - windows = element.windows + if self.has_windowed_side_inputs: + for w in element.windows: + args, kwargs = util.insert_values_in_args( + self.args, self.kwargs, [si[w] for si in self.side_inputs]) + self._new_dofn_window_process(element, args, kwargs, w) else: - windows = [window.GlobalWindow()] - - for w in windows: - args, kwargs = util.insert_values_in_args( - self.args, self.kwargs, - [s[w] for s in self.side_inputs]) - - # If there are more arguments than the default then the first argument - # should be the element and the rest should be picked from the side - # inputs as window and timestamp should always be tagged - if len(arguments) > len(defaults) + self_in_args: - if core.NewDoFn.ElementParam not in defaults: - args_to_pick = len(arguments) - len(defaults) - 1 - self_in_args - final_args = [element.value] + args[:args_to_pick] - else: - args_to_pick = len(arguments) - len(defaults) - self_in_args - final_args = args[:args_to_pick] - else: - args_to_pick = 0 - final_args = [] - args = iter(args[args_to_pick:]) - - for a, d in zip(arguments[-len(defaults):], defaults): - if d == core.NewDoFn.ElementParam: - final_args.append(element.value) - elif d == core.NewDoFn.ContextParam: - final_args.append(self.context) - elif d == core.NewDoFn.WindowParam: - final_args.append(w) - elif d == core.NewDoFn.TimestampParam: - final_args.append(element.timestamp) - elif d == core.NewDoFn.SideInputParam: - # If no more args are present then the value must be passed via kwarg - try: - final_args.append(args.next()) - except StopIteration: - if a not in kwargs: - raise - else: - # If no more args are present then the value must be passed via kwarg - try: - final_args.append(args.next()) - except StopIteration: - if a not in kwargs: - kwargs[a] = d - final_args.extend(list(args)) - self._process_outputs(element, self.dofn.process(*final_args, **kwargs)) + self._new_dofn_window_process(element, self.args, self.kwargs, None) def _invoke_bundle_method(self, method): try: @@ -271,7 +311,10 @@ class DoFnRunner(Receiver): self.logging_context.enter() self.scoped_metrics_container.enter() if self.is_new_dofn: - self.new_dofn_process(element) + if self.simple_process: + self.new_dofn_simple_process(element) + else: + self.new_dofn_process(element) else: self.old_dofn_process(element) except BaseException as exn:
