Repository: beam Updated Branches: refs/heads/master 4d0e8ecf1 -> b3d962df2
Remove the usage of OldDoFn and clean up function names Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/137d392e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/137d392e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/137d392e Branch: refs/heads/master Commit: 137d392e79080f67a24fa418c474e4f852d1dd74 Parents: 4d0e8ec Author: Sourabh Bajaj <[email protected]> Authored: Sun Feb 5 11:11:25 2017 -0800 Committer: Sourabh Bajaj <[email protected]> Committed: Sun Feb 5 16:01:54 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/common.pxd | 12 +- sdks/python/apache_beam/runners/common.py | 225 +++++++------------ .../runners/direct/transform_evaluator.py | 15 +- sdks/python/apache_beam/transforms/core.py | 87 +------ sdks/python/apache_beam/typehints/typecheck.py | 133 ----------- 5 files changed, 92 insertions(+), 380 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/137d392e/sdks/python/apache_beam/runners/common.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index dbb08f0..f36fdd0 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -37,22 +37,20 @@ cdef class DoFnRunner(Receiver): cdef object tagged_receivers cdef LoggingContext logging_context cdef object step_name - cdef bint is_new_dofn cdef list args cdef dict kwargs cdef ScopedMetricsContainer scoped_metrics_container cdef list side_inputs - cdef bint has_windowed_side_inputs + cdef bint has_windowed_inputs cdef list placeholders - cdef bint simple_process + cdef bint use_simple_invoker cdef Receiver main_receivers cpdef process(self, WindowedValue element) - cdef old_dofn_process(self, WindowedValue element) - cdef new_dofn_process(self, WindowedValue element) - cdef new_dofn_simple_process(self, WindowedValue element) - cdef _new_dofn_window_process( + cdef _dofn_invoker(self, WindowedValue element) + cdef _dofn_simple_invoker(self, WindowedValue element) + cdef _dofn_window_invoker( self, WindowedValue element, list args, dict kwargs, object window) @cython.locals(windowed_value=WindowedValue) http://git-wip-us.apache.org/repos/asf/beam/blob/137d392e/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 9c942c0..aa6c2dd 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -115,141 +115,89 @@ class DoFnRunner(Receiver): assert context is not None self.context = context - # TODO(Sourabhbajaj): Remove the usage of OldDoFn - if isinstance(fn, core.DoFn): - - class ArgPlaceholder(object): - def __init__(self, placeholder): - self.placeholder = placeholder - - self.is_new_dofn = True + class ArgPlaceholder(object): + def __init__(self, placeholder): + self.placeholder = placeholder + + # Stash values for use in dofn_process. + self.side_inputs = side_inputs + self.has_windowed_inputs = not all( + si.is_globally_windowed() for si in self.side_inputs) + + self.args = args if args else [] + self.kwargs = kwargs if kwargs else {} + self.dofn = fn + self.dofn_process = fn.process + + arguments, _, _, defaults = self.dofn.get_function_arguments('process') + defaults = defaults if defaults else [] + self_in_args = int(self.dofn.is_process_bounded()) + + self.use_simple_invoker = ( + not side_inputs and not args and not kwargs and not defaults) + if self.use_simple_invoker: + # As we're using the simple invoker we don't need to compute placeholders + return - # Stash values for use in new_dofn_process. - self.side_inputs = side_inputs - self.has_windowed_side_inputs = not all( - si.is_globally_windowed() for si in self.side_inputs) + self.has_windowed_inputs = (self.has_windowed_inputs or + core.DoFn.WindowParam in defaults) - self.args = args if args else [] - self.kwargs = kwargs if kwargs else {} - self.dofn = fn - self.dofn_process = fn.process + # Try to prepare all the arguments that can just be filled in + # without any additional work. in the process function. + # Also cache all the placeholders needed in the process function. - arguments, _, _, defaults = self.dofn.get_function_arguments('process') - defaults = defaults if defaults else [] - self_in_args = int(self.dofn.is_process_bounded()) - - self.simple_process = ( - not side_inputs and not args and not kwargs and not defaults) - if self.simple_process: - return - - # TODO(Sourabhbajaj) Rename this variable once oldDoFn is deprecated - self.has_windowed_side_inputs = ( - self.has_windowed_side_inputs or - core.DoFn.WindowParam in defaults) - - # Try to prepare all the arguments that can just be filled in - # without any additional work. in the process function. - # Also cache all the placeholders needed in the process function. - - # Fill in sideInputs if they are globally windowed - if not self.has_windowed_side_inputs: - self.args, self.kwargs = util.insert_values_in_args( - args, kwargs, [si[global_window] for si in side_inputs]) - - # Create placeholder for element parameter - if core.DoFn.ElementParam not in defaults: - args_to_pick = len(arguments) - len(defaults) - 1 - self_in_args - final_args = [ArgPlaceholder(core.DoFn.ElementParam)] + \ - self.args[:args_to_pick] - else: - args_to_pick = len(arguments) - len(defaults) - self_in_args - final_args = self.args[:args_to_pick] - - # Fill the OtherPlaceholders for context, window or timestamp - args = iter(self.args[args_to_pick:]) - for a, d in zip(arguments[-len(defaults):], defaults): - if d == core.DoFn.ElementParam: - final_args.append(ArgPlaceholder(d)) - elif d == core.DoFn.ContextParam: - final_args.append(ArgPlaceholder(d)) - elif d == core.DoFn.WindowParam: - final_args.append(ArgPlaceholder(d)) - elif d == core.DoFn.TimestampParam: - final_args.append(ArgPlaceholder(d)) - elif d == core.DoFn.SideInputParam: - # If no more args are present then the value must be passed via kwarg - try: - final_args.append(args.next()) - except StopIteration: - if a not in self.kwargs: - raise ValueError("Value for sideinput %s not provided" % a) - else: - # If no more args are present then the value must be passed via kwarg - try: - final_args.append(args.next()) - except StopIteration: - pass - final_args.extend(list(args)) - self.args = final_args - - # Stash the list of placeholder positions for performance - self.placeholders = [(i, x.placeholder) for (i, x) in enumerate(self.args) - if isinstance(x, ArgPlaceholder)] + # Fill in sideInputs if they are globally windowed + if not self.has_windowed_inputs: + self.args, self.kwargs = util.insert_values_in_args( + args, kwargs, [si[global_window] for si in side_inputs]) + # Create placeholder for element parameter + if core.DoFn.ElementParam not in defaults: + args_to_pick = len(arguments) - len(defaults) - 1 - self_in_args + final_args = [ArgPlaceholder(core.DoFn.ElementParam)] + \ + self.args[:args_to_pick] else: - self.is_new_dofn = False - self.has_windowed_side_inputs = False # Set to True in one case below. - if not args and not kwargs: - self.dofn = fn - self.dofn_process = fn.process + args_to_pick = len(arguments) - len(defaults) - self_in_args + final_args = self.args[:args_to_pick] + + # Fill the OtherPlaceholders for context, window or timestamp + args = iter(self.args[args_to_pick:]) + for a, d in zip(arguments[-len(defaults):], defaults): + if d == core.DoFn.ElementParam: + final_args.append(ArgPlaceholder(d)) + elif d == core.DoFn.ContextParam: + final_args.append(ArgPlaceholder(d)) + elif d == core.DoFn.WindowParam: + final_args.append(ArgPlaceholder(d)) + elif d == core.DoFn.TimestampParam: + final_args.append(ArgPlaceholder(d)) + elif d == core.DoFn.SideInputParam: + # If no more args are present then the value must be passed via kwarg + try: + final_args.append(args.next()) + except StopIteration: + if a not in self.kwargs: + raise ValueError("Value for sideinput %s not provided" % a) else: - if side_inputs and all( - side_input.is_globally_windowed() for side_input in side_inputs): - args, kwargs = util.insert_values_in_args( - args, kwargs, [side_input[global_window] - for side_input in side_inputs]) - side_inputs = [] - if side_inputs: - self.has_windowed_side_inputs = True - - def process(context): - w = context.windows[0] - cur_args, cur_kwargs = util.insert_values_in_args( - args, kwargs, [side_input[w] for side_input in side_inputs]) - return fn.process(context, *cur_args, **cur_kwargs) - self.dofn_process = process - elif kwargs: - self.dofn_process = lambda context: fn.process( - context, *args, **kwargs) - else: - self.dofn_process = lambda context: fn.process(context, *args) - - class CurriedFn(core.OldDoFn): - - start_bundle = staticmethod(fn.start_bundle) - process = staticmethod(self.dofn_process) - finish_bundle = staticmethod(fn.finish_bundle) - - self.dofn = CurriedFn() + # If no more args are present then the value must be passed via kwarg + try: + final_args.append(args.next()) + except StopIteration: + pass + final_args.extend(list(args)) + self.args = final_args + + # Stash the list of placeholder positions for performance + self.placeholders = [(i, x.placeholder) for (i, x) in enumerate(self.args) + if isinstance(x, ArgPlaceholder)] def receive(self, windowed_value): self.process(windowed_value) - def old_dofn_process(self, element): - if self.has_windowed_side_inputs and len(element.windows) > 1: - for w in element.windows: - self.context.set_element( - WindowedValue(element.value, element.timestamp, (w,))) - self._process_outputs(element, self.dofn_process(self.context)) - else: - self.context.set_element(element) - self._process_outputs(element, self.dofn_process(self.context)) - - def new_dofn_simple_process(self, element): + def _dofn_simple_invoker(self, element): self._process_outputs(element, self.dofn_process(element.value)) - def _new_dofn_window_process(self, element, args, kwargs, window): + def _dofn_window_invoker(self, element, args, kwargs, window): # TODO(sourabhbajaj): Investigate why we can't use `is` instead of == for i, p in self.placeholders: if p == core.DoFn.ElementParam: @@ -265,18 +213,18 @@ class DoFnRunner(Receiver): else: self._process_outputs(element, self.dofn_process(*args, **kwargs)) - def new_dofn_process(self, element): + def _dofn_invoker(self, element): self.context.set_element(element) # Call for the process function for each window if has windowed side inputs # or if the process accesses the window parameter. We can just call it once # otherwise as none of the arguments are changing - if self.has_windowed_side_inputs: + if self.has_windowed_inputs: for w in element.windows: args, kwargs = util.insert_values_in_args( self.args, self.kwargs, [si[w] for si in self.side_inputs]) - self._new_dofn_window_process(element, args, kwargs, w) + self._dofn_window_invoker(element, args, kwargs, w) else: - self._new_dofn_window_process(element, self.args, self.kwargs, None) + self._dofn_window_invoker(element, self.args, self.kwargs, None) def _invoke_bundle_method(self, method): try: @@ -285,15 +233,11 @@ class DoFnRunner(Receiver): self.context.set_element(None) f = getattr(self.dofn, method) - # TODO(Sourabhbajaj): Remove this if-else - if self.is_new_dofn: - _, _, _, defaults = self.dofn.get_function_arguments(method) - defaults = defaults if defaults else [] - args = [self.context if d == core.DoFn.ContextParam else d - for d in defaults] - self._process_outputs(None, f(*args)) - else: - self._process_outputs(None, f(self.context)) + _, _, _, defaults = self.dofn.get_function_arguments(method) + defaults = defaults if defaults else [] + args = [self.context if d == core.DoFn.ContextParam else d + for d in defaults] + self._process_outputs(None, f(*args)) except BaseException as exn: self.reraise_augmented(exn) finally: @@ -310,13 +254,10 @@ class DoFnRunner(Receiver): try: self.logging_context.enter() self.scoped_metrics_container.enter() - if self.is_new_dofn: - if self.simple_process: - self.new_dofn_simple_process(element) - else: - self.new_dofn_process(element) + if self.use_simple_invoker: + self._dofn_simple_invoker(element) else: - self.old_dofn_process(element) + self._dofn_invoker(element) except BaseException as exn: self.reraise_augmented(exn) finally: http://git-wip-us.apache.org/repos/asf/beam/blob/137d392e/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 3053fd3..1700de6 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -35,10 +35,8 @@ from apache_beam.transforms import sideinputs from apache_beam.transforms.window import GlobalWindows from apache_beam.transforms.window import WindowedValue from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn -from apache_beam.typehints.typecheck import OutputCheckWrapperOldDoFn from apache_beam.typehints.typecheck import TypeCheckError from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn -from apache_beam.typehints.typecheck import TypeCheckWrapperOldDoFn from apache_beam.utils import counters from apache_beam.utils.pipeline_options import TypeOptions @@ -350,18 +348,9 @@ class _ParDoEvaluator(_TransformEvaluator): pipeline_options = self._evaluation_context.pipeline_options if (pipeline_options is not None and pipeline_options.view_as(TypeOptions).runtime_type_check): - # TODO(sourabhbajaj): Remove this if-else - if isinstance(dofn, core.DoFn): - dofn = TypeCheckWrapperDoFn(dofn, transform.get_type_hints()) - else: - dofn = TypeCheckWrapperOldDoFn(dofn, transform.get_type_hints()) + dofn = TypeCheckWrapperDoFn(dofn, transform.get_type_hints()) - # TODO(sourabhbajaj): Remove this if-else - if isinstance(dofn, core.DoFn): - dofn = OutputCheckWrapperDoFn(dofn, self._applied_ptransform.full_label) - else: - dofn = OutputCheckWrapperOldDoFn(dofn, - self._applied_ptransform.full_label) + dofn = OutputCheckWrapperDoFn(dofn, self._applied_ptransform.full_label) self.runner = DoFnRunner( dofn, transform.args, transform.kwargs, self._side_inputs, http://git-wip-us.apache.org/repos/asf/beam/blob/137d392e/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 91de7f6..e010cbd 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -21,7 +21,6 @@ from __future__ import absolute_import import copy import inspect -import warnings import types from apache_beam import pvalue @@ -216,88 +215,6 @@ class DoFn(WithTypeHints, HasDisplayData): return True -# TODO(Sourabh): Remove after migration to DoFn -class OldDoFn(WithTypeHints, HasDisplayData): - """A function object used by a transform with custom processing. - - The ParDo transform is such a transform. The ParDo.expand() - method will take an object of type DoFn and apply it to all elements of a - PCollection object. - - In order to have concrete DoFn objects one has to subclass from DoFn and - define the desired behavior (start_bundle/finish_bundle and process) or wrap a - callable object using the CallableWrapperDoFn class. - """ - - def __init__(self): - warnings.warn('Use of OldDoFn is deprecated please use DoFn instead') - super(OldDoFn, self).__init__() - - def default_label(self): - return self.__class__.__name__ - - def infer_output_type(self, input_type): - # TODO(robertwb): Side inputs types. - # TODO(robertwb): Assert compatibility with input type hint? - return self._strip_output_annotations( - trivial_inference.infer_return_type(self.process, [input_type])) - - def start_bundle(self, context): - """Called before a bundle of elements is processed on a worker. - - Elements to be processed are split into bundles and distributed - to workers. Before a worker calls process() on the first element - of its bundle, it calls this method. - - Args: - context: a DoFnContext object - """ - pass - - def finish_bundle(self, context): - """Called after a bundle of elements is processed on a worker. - - Args: - context: a DoFnContext object - """ - pass - - def process(self, context, *args, **kwargs): - """Called for each element of a pipeline. - - Args: - context: a DoFnProcessContext object containing, among other - attributes, the element to be processed. - See the DoFnProcessContext documentation for details. - *args: side inputs - **kwargs: keyword side inputs - """ - raise NotImplementedError - - @staticmethod - def from_callable(fn): - return CallableWrapperDoFn(fn) - - def process_argspec_fn(self): - """Returns the Python callable that will eventually be invoked. - - This should ideally be the user-level function that is called with - the main and (if any) side inputs, and is used to relate the type - hint parameters with the input parameters (e.g., by argument name). - """ - return self.process - - def _strip_output_annotations(self, type_hint): - annotations = (TimestampedValue, WindowedValue, pvalue.SideOutputValue) - # TODO(robertwb): These should be parameterized types that the - # type inferencer understands. - if (type_hint in annotations - or trivial_inference.element_type(type_hint) in annotations): - return Any - else: - return type_hint - - def _fn_takes_side_inputs(fn): try: argspec = inspect.getargspec(fn) @@ -679,7 +596,7 @@ class ParDo(PTransformWithSideInputs): def __init__(self, fn_or_label, *args, **kwargs): super(ParDo, self).__init__(fn_or_label, *args, **kwargs) - if not isinstance(self.fn, (OldDoFn, DoFn)): + if not isinstance(self.fn, DoFn): raise TypeError('ParDo must be called with a DoFn instance.') def default_type_hints(self): @@ -690,7 +607,7 @@ class ParDo(PTransformWithSideInputs): self.fn.infer_output_type(input_type)) def make_fn(self, fn): - if isinstance(fn, (OldDoFn, DoFn)): + if isinstance(fn, DoFn): return fn return CallableWrapperDoFn(fn) http://git-wip-us.apache.org/repos/asf/beam/blob/137d392e/sdks/python/apache_beam/typehints/typecheck.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py index bab5bb0..defa71e 100644 --- a/sdks/python/apache_beam/typehints/typecheck.py +++ b/sdks/python/apache_beam/typehints/typecheck.py @@ -24,7 +24,6 @@ import types from apache_beam.pvalue import SideOutputValue from apache_beam.transforms.core import DoFn -from apache_beam.transforms.core import OldDoFn from apache_beam.transforms.window import WindowedValue from apache_beam.typehints import check_constraint from apache_beam.typehints import CompositeTypeHintError @@ -35,138 +34,6 @@ from apache_beam.typehints.decorators import _check_instance_type from apache_beam.typehints.decorators import getcallargs_forhints -# TODO(Sourabh): Remove after migration to DoFn -class TypeCheckWrapperOldDoFn(OldDoFn): - """A wrapper around a DoFn which performs type-checking of input and output. - """ - - def __init__(self, dofn, type_hints, label=None): - super(TypeCheckWrapperDoFn, self).__init__() - self._dofn = dofn - self._label = label - self._process_fn = self._dofn.process_argspec_fn() - if type_hints.input_types: - input_args, input_kwargs = type_hints.input_types - self._input_hints = getcallargs_forhints( - self._process_fn, *input_args, **input_kwargs) - else: - self._input_hints = None - # TODO(robertwb): Actually extract this. - self.context_var = 'context' - # TODO(robertwb): Multi-output. - self._output_type_hint = type_hints.simple_output_type(label) - - def start_bundle(self, context): - return self._type_check_result( - self._dofn.start_bundle(context)) - - def finish_bundle(self, context): - return self._type_check_result( - self._dofn.finish_bundle(context)) - - def process(self, context, *args, **kwargs): - if self._input_hints: - actual_inputs = inspect.getcallargs( - self._process_fn, context.element, *args, **kwargs) - for var, hint in self._input_hints.items(): - if hint is actual_inputs[var]: - # self parameter - continue - var_name = var + '.element' if var == self.context_var else var - _check_instance_type(hint, actual_inputs[var], var_name, True) - return self._type_check_result(self._dofn.process(context, *args, **kwargs)) - - def _type_check_result(self, transform_results): - if self._output_type_hint is None or transform_results is None: - return transform_results - - def type_check_output(o): - # TODO(robertwb): Multi-output. - x = o.value if isinstance(o, (SideOutputValue, WindowedValue)) else o - self._type_check(self._output_type_hint, x, is_input=False) - - # If the return type is a generator, then we will need to interleave our - # type-checking with its normal iteration so we don't deplete the - # generator initially just by type-checking its yielded contents. - if isinstance(transform_results, types.GeneratorType): - return GeneratorWrapper(transform_results, type_check_output) - else: - for o in transform_results: - type_check_output(o) - return transform_results - - def _type_check(self, type_constraint, datum, is_input): - """Typecheck a PTransform related datum according to a type constraint. - - This function is used to optionally type-check either an input or an output - to a PTransform. - - Args: - type_constraint: An instance of a typehints.TypeContraint, one of the - white-listed builtin Python types, or a custom user class. - datum: An instance of a Python object. - is_input: True if 'datum' is an input to a PTransform's DoFn. False - otherwise. - - Raises: - TypeError: If 'datum' fails to type-check according to 'type_constraint'. - """ - datum_type = 'input' if is_input else 'output' - - try: - check_constraint(type_constraint, datum) - except CompositeTypeHintError as e: - raise TypeCheckError, e.message, sys.exc_info()[2] - except SimpleTypeHintError: - error_msg = ("According to type-hint expected %s should be of type %s. " - "Instead, received '%s', an instance of type %s." - % (datum_type, type_constraint, datum, type(datum))) - raise TypeCheckError, error_msg, sys.exc_info()[2] - - -# TODO(Sourabh): Remove after migration to DoFn -class OutputCheckWrapperOldDoFn(OldDoFn): - """A DoFn that verifies against common errors in the output type.""" - - def __init__(self, dofn, full_label): - self.dofn = dofn - self.full_label = full_label - - def run(self, method, context, args, kwargs): - try: - result = method(context, *args, **kwargs) - except TypeCheckError as e: - error_msg = ('Runtime type violation detected within ParDo(%s): ' - '%s' % (self.full_label, e)) - raise TypeCheckError, error_msg, sys.exc_info()[2] - else: - return self._check_type(result) - - def start_bundle(self, context): - return self.run(self.dofn.start_bundle, context, [], {}) - - def finish_bundle(self, context): - return self.run(self.dofn.finish_bundle, context, [], {}) - - def process(self, context, *args, **kwargs): - return self.run(self.dofn.process, context, args, kwargs) - - def _check_type(self, output): - if output is None: - return output - elif isinstance(output, (dict, basestring)): - object_type = type(output).__name__ - raise TypeCheckError('Returning a %s from a ParDo or FlatMap is ' - 'discouraged. Please use list("%s") if you really ' - 'want this behavior.' % - (object_type, output)) - elif not isinstance(output, collections.Iterable): - raise TypeCheckError('FlatMap and ParDo must return an ' - 'iterable. %s was returned instead.' - % type(output)) - return output - - class AbstractDoFnWrapper(DoFn): """An abstract class to create wrapper around DoFn"""
