Repository: beam Updated Branches: refs/heads/master afb96d72a -> 009469972
Updates DoFn invocation logic to be more extensible. Adds following abstractions. DoFnSignature: describes the signature of a given DoFn object. DoFnInvoker: defines a particular way for invoking DoFn methods. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7db375d4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7db375d4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7db375d4 Branch: refs/heads/master Commit: 7db375d40f9b230d989c087ccfc08844c29afdac Parents: afb96d7 Author: [email protected] <[email protected]> Authored: Fri Apr 7 13:41:28 2017 -0700 Committer: [email protected] <[email protected]> Committed: Tue Apr 25 23:30:01 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/common.pxd | 62 +++- sdks/python/apache_beam/runners/common.py | 441 ++++++++++++++++-------- 2 files changed, 344 insertions(+), 159 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7db375d4/sdks/python/apache_beam/runners/common.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index 5952942..f3395c1 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -28,32 +28,62 @@ cdef class Receiver(object): cpdef receive(self, WindowedValue windowed_value) -cdef class DoFnRunner(Receiver): +cdef class DoFnMethodWrapper(object): + cdef public object args + cdef public object defaults + cdef public object method_value - cdef object dofn - cdef object dofn_process - cdef object window_fn + +cdef class DoFnSignature(object): + cdef public DoFnMethodWrapper process_method + cdef public DoFnMethodWrapper start_bundle_method + cdef public DoFnMethodWrapper finish_bundle_method + cdef public object do_fn + + +cdef class DoFnInvoker(object): + cdef public DoFnSignature signature + cdef OutputProcessor output_processor + + cpdef invoke_process(self, WindowedValue windowed_value) + cpdef invoke_start_bundle(self) + cpdef invoke_finish_bundle(self) + + # TODO(chamikara) define static method create_invoker() here. + + +cdef class SimpleInvoker(DoFnInvoker): + cdef object process_method + + +cdef class PerWindowInvoker(DoFnInvoker): + cdef list side_inputs + cdef DoFnContext context + cdef list args_for_process + cdef dict kwargs_for_process + cdef list placeholders + cdef bint has_windowed_inputs + cdef object process_method + + +cdef class DoFnRunner(Receiver): cdef DoFnContext context - cdef object tagged_receivers cdef LoggingContext logging_context cdef object step_name - cdef list args - cdef dict kwargs cdef ScopedMetricsContainer scoped_metrics_container cdef list side_inputs - cdef bint has_windowed_inputs - cdef list placeholders - cdef bint use_simple_invoker + cdef DoFnInvoker do_fn_invoker + + cpdef process(self, WindowedValue windowed_value) - cdef Receiver main_receivers - cpdef process(self, WindowedValue element) - cdef _dofn_invoker(self, WindowedValue element) - cdef _dofn_simple_invoker(self, WindowedValue element) - cdef _dofn_per_window_invoker(self, WindowedValue element) +cdef class OutputProcessor(object): + cdef object window_fn + cdef Receiver main_receivers + cdef object tagged_receivers @cython.locals(windowed_value=WindowedValue) - cpdef _process_outputs(self, WindowedValue element, results) + cpdef process_outputs(self, WindowedValue element, results) cdef class DoFnContext(object): http://git-wip-us.apache.org/repos/asf/beam/blob/7db375d4/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 64d6d00..08071a6 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -51,6 +51,262 @@ class Receiver(object): raise NotImplementedError +class DoFnMethodWrapper(object): + """Represents a method of a DoFn object.""" + + def __init__(self, do_fn, method_name): + """ + Initiates a ``DoFnMethodWrapper``. + + Args: + do_fn: A DoFn object that contains the method. + method_name: name of the method as a string. + """ + + args, _, _, defaults = do_fn.get_function_arguments(method_name) + defaults = defaults if defaults else [] + method_value = getattr(do_fn, method_name) + self.method_value = method_value + self.args = args + self.defaults = defaults + + +class DoFnSignature(object): + """Represents the signature of a given ``DoFn`` object. + + Signature of a ``DoFn`` provides a view of the properties of a given ``DoFn``. + Among other things, this will give an extensible way for for (1) accessing the + structure of the ``DoFn`` including methods and method parameters + (2) identifying features that a given ``DoFn`` support, for example, whether + a given ``DoFn`` is a Splittable ``DoFn`` ( + https://s.apache.org/splittable-do-fn) (3) validating a ``DoFn`` based on the + feature set offered by it. + """ + + def __init__(self, do_fn): + # We add a property here for all methods defined by Beam DoFn features. + + assert isinstance(do_fn, core.DoFn) + self.do_fn = do_fn + + self.process_method = DoFnMethodWrapper(do_fn, 'process') + self.start_bundle_method = DoFnMethodWrapper(do_fn, 'start_bundle') + self.finish_bundle_method = DoFnMethodWrapper(do_fn, 'finish_bundle') + self._validate() + + def _validate(self): + self._validate_bundle_method(self.start_bundle_method) + self._validate_bundle_method(self.finish_bundle_method) + + def _validate_bundle_method(self, method_wrapper): + # Bundle methods may only contain ContextParam. + + # Here we use the fact that every DoFn parameter defined in core.DoFn has + # the value that is the same as the name of the parameter and ends with + # string 'Param'. + unsupported_dofn_params = [i for i in core.DoFn.__dict__ if ( + i.endswith('Param') and i != 'ContextParam')] + + for param in unsupported_dofn_params: + assert param not in method_wrapper.defaults + + +class DoFnInvoker(object): + """An abstraction that can be used to execute DoFn methods. + + A DoFnInvoker describes a particular way for invoking methods of a DoFn + represented by a given DoFnSignature.""" + + def __init__(self, output_processor, signature): + self.output_processor = output_processor + self.signature = signature + + @staticmethod + def create_invoker( + output_processor, + signature, context, side_inputs, input_args, input_kwargs): + """ Creates a new DoFnInvoker based on given arguments. + + Args: + signature: a DoFnSignature for the DoFn being invoked. + context: Context to be used when invoking the DoFn (deprecated). + side_inputs: side inputs to be used when invoking th process method. + input_args: arguments to be used when invoking the process method + input_kwargs: kwargs to be used when invoking the process method. + """ + default_arg_values = signature.process_method.defaults + use_simple_invoker = ( + not side_inputs and not input_args and not input_kwargs and + not default_arg_values) + if use_simple_invoker: + return SimpleInvoker(output_processor, signature) + else: + return PerWindowInvoker( + output_processor, + signature, context, side_inputs, input_args, input_kwargs) + + def invoke_process(self, windowed_value): + """Invokes the DoFn.process() function. + + Args: + windowed_value: a WindowedValue object that gives the element for which + process() method should be invoked along with the window + the element belongs to. + """ + raise NotImplementedError + + def invoke_start_bundle(self): + """Invokes the DoFn.start_bundle() method. + """ + defaults = self.signature.start_bundle_method.defaults + args = [self.context if d == core.DoFn.ContextParam else d + for d in defaults] + self.output_processor.process_outputs( + None, self.signature.start_bundle_method.method_value(*args)) + + def invoke_finish_bundle(self): + """Invokes the DoFn.finish_bundle() method. + """ + defaults = self.signature.finish_bundle_method.defaults + args = [self.context if d == core.DoFn.ContextParam else d + for d in defaults] + self.output_processor.process_outputs( + None, self.signature.finish_bundle_method.method_value(*args)) + + +class SimpleInvoker(DoFnInvoker): + """An invoker that processes elements ignoring windowing information.""" + + def __init__(self, output_processor, signature): + super(SimpleInvoker, self).__init__(output_processor, signature) + self.process_method = signature.process_method.method_value + + def invoke_process(self, windowed_value): + self.output_processor.process_outputs( + windowed_value, self.process_method(windowed_value.value)) + + +class PerWindowInvoker(DoFnInvoker): + """An invoker that processes elements considering windowing information.""" + + def __init__(self, output_processor, signature, context, + side_inputs, input_args, input_kwargs): + super(PerWindowInvoker, self).__init__(output_processor, signature) + self.side_inputs = side_inputs + self.context = context + self.process_method = signature.process_method.method_value + default_arg_values = signature.process_method.defaults + self.has_windowed_inputs = ( + not all(si.is_globally_windowed() for si in side_inputs) or + (core.DoFn.WindowParam in default_arg_values)) + + # Try to prepare all the arguments that can just be filled in + # without any additional work. in the process function. + # Also cache all the placeholders needed in the process function. + + # Fill in sideInputs if they are globally windowed + global_window = GlobalWindow() + + input_args = input_args if input_args else [] + input_kwargs = input_kwargs if input_kwargs else {} + + if not self.has_windowed_inputs: + input_args, input_kwargs = util.insert_values_in_args( + input_args, input_kwargs, [si[global_window] for si in side_inputs]) + + arguments = signature.process_method.args + defaults = signature.process_method.defaults + + # Create placeholder for element parameter of DoFn.process() method. + self_in_args = int(signature.do_fn.is_process_bounded()) + + class ArgPlaceholder(object): + def __init__(self, placeholder): + self.placeholder = placeholder + + if core.DoFn.ElementParam not in default_arg_values: + args_to_pick = len(arguments) - len(default_arg_values) - 1 - self_in_args + args_with_placeholders = ( + [ArgPlaceholder(core.DoFn.ElementParam)] + input_args[:args_to_pick]) + else: + args_to_pick = len(arguments) - len(defaults) - self_in_args + args_with_placeholders = input_args[:args_to_pick] + + # Fill the OtherPlaceholders for context, window or timestamp + remaining_args_iter = iter(input_args[args_to_pick:]) + for a, d in zip(arguments[-len(defaults):], defaults): + if d == core.DoFn.ElementParam: + args_with_placeholders.append(ArgPlaceholder(d)) + elif d == core.DoFn.ContextParam: + args_with_placeholders.append(ArgPlaceholder(d)) + elif d == core.DoFn.WindowParam: + args_with_placeholders.append(ArgPlaceholder(d)) + elif d == core.DoFn.TimestampParam: + args_with_placeholders.append(ArgPlaceholder(d)) + elif d == core.DoFn.SideInputParam: + # If no more args are present then the value must be passed via kwarg + try: + args_with_placeholders.append(remaining_args_iter.next()) + except StopIteration: + if a not in input_kwargs: + raise ValueError("Value for sideinput %s not provided" % a) + else: + # If no more args are present then the value must be passed via kwarg + try: + args_with_placeholders.append(remaining_args_iter.next()) + except StopIteration: + pass + args_with_placeholders.extend(list(remaining_args_iter)) + + # Stash the list of placeholder positions for performance + self.placeholders = [(i, x.placeholder) for (i, x) in enumerate( + args_with_placeholders) + if isinstance(x, ArgPlaceholder)] + + self.args_for_process = args_with_placeholders + self.kwargs_for_process = input_kwargs + + def invoke_process(self, windowed_value): + self.context.set_element(windowed_value) + # Call for the process function for each window if has windowed side inputs + # or if the process accesses the window parameter. We can just call it once + # otherwise as none of the arguments are changing + if self.has_windowed_inputs and len(windowed_value.windows) != 1: + for w in windowed_value.windows: + self._invoke_per_window( + WindowedValue(windowed_value.value, windowed_value.timestamp, (w,))) + else: + self._invoke_per_window(windowed_value) + + def _invoke_per_window(self, windowed_value): + if self.has_windowed_inputs: + window, = windowed_value.windows + args_for_process, kwargs_for_process = util.insert_values_in_args( + self.args_for_process, self.kwargs_for_process, + [si[window] for si in self.side_inputs]) + else: + args_for_process, kwargs_for_process = ( + self.args_for_process, self.kwargs_for_process) + # TODO(sourabhbajaj): Investigate why we can't use `is` instead of == + for i, p in self.placeholders: + if p == core.DoFn.ElementParam: + args_for_process[i] = windowed_value.value + elif p == core.DoFn.ContextParam: + args_for_process[i] = self.context + elif p == core.DoFn.WindowParam: + args_for_process[i] = window + elif p == core.DoFn.TimestampParam: + args_for_process[i] = windowed_value.timestamp + + if kwargs_for_process: + self.output_processor.process_outputs( + windowed_value, + self.process_method(*args_for_process, **kwargs_for_process)) + else: + self.output_processor.process_outputs( + windowed_value, self.process_method(*args_for_process)) + + class DoFnRunner(Receiver): """A helper class for executing ParDo operations. """ @@ -88,13 +344,9 @@ class DoFnRunner(Receiver): state: handle for accessing DoFn state scoped_metrics_container: Context switcher for metrics container """ - self.step_name = step_name - self.window_fn = windowing.windowfn - self.tagged_receivers = tagged_receivers self.scoped_metrics_container = (scoped_metrics_container or ScopedMetricsContainer()) - - global_window = GlobalWindow() + self.step_name = step_name # Need to support multiple iterations. side_inputs = list(side_inputs) @@ -104,172 +356,59 @@ class DoFnRunner(Receiver): else: self.logging_context = get_logging_context(logger, step_name=step_name) - # Optimize for the common case. - self.main_receivers = as_receiver(tagged_receivers[None]) - # TODO(sourabh): Deprecate the use of context if state: assert context is None - self.context = DoFnContext(self.step_name, state=state) + context = DoFnContext(step_name, state=state) else: assert context is not None - self.context = context + context = context - class ArgPlaceholder(object): - def __init__(self, placeholder): - self.placeholder = placeholder - - # Stash values for use in dofn_process. - self.side_inputs = side_inputs - self.has_windowed_inputs = not all( - si.is_globally_windowed() for si in self.side_inputs) + self.context = context - self.args = args if args else [] - self.kwargs = kwargs if kwargs else {} - self.dofn = fn - self.dofn_process = fn.process - - arguments, _, _, defaults = self.dofn.get_function_arguments('process') - defaults = defaults if defaults else [] - self_in_args = int(self.dofn.is_process_bounded()) + do_fn_signature = DoFnSignature(fn) - self.use_simple_invoker = ( - not side_inputs and not args and not kwargs and not defaults) - if self.use_simple_invoker: - # As we're using the simple invoker we don't need to compute placeholders - return - - self.has_windowed_inputs = (self.has_windowed_inputs or - core.DoFn.WindowParam in defaults) - - # Try to prepare all the arguments that can just be filled in - # without any additional work. in the process function. - # Also cache all the placeholders needed in the process function. - - # Fill in sideInputs if they are globally windowed - if not self.has_windowed_inputs: - self.args, self.kwargs = util.insert_values_in_args( - args, kwargs, [si[global_window] for si in side_inputs]) - - # Create placeholder for element parameter - if core.DoFn.ElementParam not in defaults: - args_to_pick = len(arguments) - len(defaults) - 1 - self_in_args - final_args = [ArgPlaceholder(core.DoFn.ElementParam)] + \ - self.args[:args_to_pick] - else: - args_to_pick = len(arguments) - len(defaults) - self_in_args - final_args = self.args[:args_to_pick] - - # Fill the OtherPlaceholders for context, window or timestamp - args = iter(self.args[args_to_pick:]) - for a, d in zip(arguments[-len(defaults):], defaults): - if d == core.DoFn.ElementParam: - final_args.append(ArgPlaceholder(d)) - elif d == core.DoFn.ContextParam: - final_args.append(ArgPlaceholder(d)) - elif d == core.DoFn.WindowParam: - final_args.append(ArgPlaceholder(d)) - elif d == core.DoFn.TimestampParam: - final_args.append(ArgPlaceholder(d)) - elif d == core.DoFn.SideInputParam: - # If no more args are present then the value must be passed via kwarg - try: - final_args.append(args.next()) - except StopIteration: - if a not in self.kwargs: - raise ValueError("Value for sideinput %s not provided" % a) - else: - # If no more args are present then the value must be passed via kwarg - try: - final_args.append(args.next()) - except StopIteration: - pass - final_args.extend(list(args)) - self.args = final_args + # Optimize for the common case. + main_receivers = as_receiver(tagged_receivers[None]) + output_processor = OutputProcessor( + windowing.windowfn, main_receivers, tagged_receivers) - # Stash the list of placeholder positions for performance - self.placeholders = [(i, x.placeholder) for (i, x) in enumerate(self.args) - if isinstance(x, ArgPlaceholder)] + self.do_fn_invoker = DoFnInvoker.create_invoker( + output_processor, do_fn_signature, context, side_inputs, args, kwargs) def receive(self, windowed_value): self.process(windowed_value) - def _dofn_simple_invoker(self, element): - self._process_outputs(element, self.dofn_process(element.value)) - - def _dofn_per_window_invoker(self, element): - if self.has_windowed_inputs: - window, = element.windows - args, kwargs = util.insert_values_in_args( - self.args, self.kwargs, [si[window] for si in self.side_inputs]) - else: - args, kwargs = self.args, self.kwargs - # TODO(sourabhbajaj): Investigate why we can't use `is` instead of == - for i, p in self.placeholders: - if p == core.DoFn.ElementParam: - args[i] = element.value - elif p == core.DoFn.ContextParam: - args[i] = self.context - elif p == core.DoFn.WindowParam: - args[i] = window - elif p == core.DoFn.TimestampParam: - args[i] = element.timestamp - if not kwargs: - self._process_outputs(element, self.dofn_process(*args)) - else: - self._process_outputs(element, self.dofn_process(*args, **kwargs)) - - def _dofn_invoker(self, element): - self.context.set_element(element) - # Call for the process function for each window if has windowed side inputs - # or if the process accesses the window parameter. We can just call it once - # otherwise as none of the arguments are changing - if self.has_windowed_inputs and len(element.windows) != 1: - for w in element.windows: - self._dofn_per_window_invoker( - WindowedValue(element.value, element.timestamp, (w,))) - else: - self._dofn_per_window_invoker(element) - - def _invoke_bundle_method(self, method): + def process(self, windowed_value): try: self.logging_context.enter() self.scoped_metrics_container.enter() - self.context.set_element(None) - f = getattr(self.dofn, method) - - _, _, _, defaults = self.dofn.get_function_arguments(method) - defaults = defaults if defaults else [] - args = [self.context if d == core.DoFn.ContextParam else d - for d in defaults] - self._process_outputs(None, f(*args)) + self.do_fn_invoker.invoke_process(windowed_value) except BaseException as exn: - self.reraise_augmented(exn) + self._reraise_augmented(exn) finally: self.scoped_metrics_container.exit() self.logging_context.exit() - def start(self): - self._invoke_bundle_method('start_bundle') - - def finish(self): - self._invoke_bundle_method('finish_bundle') - - def process(self, element): + def _invoke_bundle_method(self, bundle_method): try: self.logging_context.enter() self.scoped_metrics_container.enter() - if self.use_simple_invoker: - self._dofn_simple_invoker(element) - else: - self._dofn_invoker(element) + self.context.set_element(None) + bundle_method() except BaseException as exn: - self.reraise_augmented(exn) + self._reraise_augmented(exn) finally: self.scoped_metrics_container.exit() self.logging_context.exit() - def reraise_augmented(self, exn): + def start(self): + self._invoke_bundle_method(self.do_fn_invoker.invoke_start_bundle) + + def finish(self): + self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle) + + def _reraise_augmented(self, exn): if getattr(exn, '_tagged_with_step', False) or not self.step_name: raise args = exn.args @@ -280,7 +419,23 @@ class DoFnRunner(Receiver): else: raise - def _process_outputs(self, windowed_input_element, results): + +class OutputProcessor(object): + """Processes output produced by DoFn method invocations.""" + + def __init__(self, window_fn, main_receivers, tagged_receivers): + """Initializes ``OutputProcessor``. + + Args: + window_fn: a windowing function (WindowFn). + main_receivers: a dict of tag name to Receiver objects. + tagged_receivers: main receiver object. + """ + self.window_fn = window_fn + self.main_receivers = main_receivers + self.tagged_receivers = tagged_receivers + + def process_outputs(self, windowed_input_element, results): """Dispatch the result of computation to the appropriate receivers. A value wrapped in a OutputValue object will be unwrapped and
