http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/ptransform.py
b/sdks/python/google/cloud/dataflow/transforms/ptransform.py
deleted file mode 100644
index 09f8015..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/ptransform.py
+++ /dev/null
@@ -1,703 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""PTransform and descendants.
-
-A PTransform is an object describing (not executing) a computation. The actual
-execution semantics for a transform is captured by a runner object. A transform
-object always belongs to a pipeline object.
-
-A PTransform derived class needs to define the apply() method that describes
-how one or more PValues are created by the transform.
-
-The module defines a few standard transforms: FlatMap (parallel do),
-GroupByKey (group by key), etc. Note that the apply() methods for these
-classes contain code that will add nodes to the processing graph associated
-with a pipeline.
-
-As support for the FlatMap transform, the module also defines a DoFn
-class and wrapper class that allows lambda functions to be used as
-FlatMap processing functions.
-"""
-
-from __future__ import absolute_import
-
-import copy
-import inspect
-import operator
-import os
-import sys
-
-from google.cloud.dataflow import coders
-from google.cloud.dataflow import error
-from google.cloud.dataflow import pvalue
-from google.cloud.dataflow import typehints
-from google.cloud.dataflow.internal import pickler
-from google.cloud.dataflow.internal import util
-from google.cloud.dataflow.typehints import getcallargs_forhints
-from google.cloud.dataflow.typehints import TypeCheckError
-from google.cloud.dataflow.typehints import validate_composite_type_param
-from google.cloud.dataflow.typehints import WithTypeHints
-from google.cloud.dataflow.typehints.trivial_inference import instance_to_type
-
-
-class _PValueishTransform(object):
- """Visitor for PValueish objects.
-
- A PValueish is a PValue, or list, tuple, dict of PValuesish objects.
-
- This visits a PValueish, contstructing a (possibly mutated) copy.
- """
- def visit(self, node, *args):
- return getattr(
- self,
- 'visit_' + node.__class__.__name__,
- lambda x, *args: x)(node, *args)
-
- def visit_list(self, node, *args):
- return [self.visit(x, *args) for x in node]
-
- def visit_tuple(self, node, *args):
- return tuple(self.visit(x, *args) for x in node)
-
- def visit_dict(self, node, *args):
- return {key: self.visit(value, *args) for (key, value) in node.items()}
-
-
-class _SetInputPValues(_PValueishTransform):
- def visit(self, node, replacements):
- if id(node) in replacements:
- return replacements[id(node)]
- else:
- return super(_SetInputPValues, self).visit(node, replacements)
-
-
-class _MaterializedDoOutputsTuple(pvalue.DoOutputsTuple):
- def __init__(self, deferred, pvalue_cache):
- super(_MaterializedDoOutputsTuple, self).__init__(
- None, None, deferred._tags, deferred._main_tag)
- self._deferred = deferred
- self._pvalue_cache = pvalue_cache
-
- def __getitem__(self, tag):
- return self._pvalue_cache.get_unwindowed_pvalue(self._deferred[tag])
-
-
-class _MaterializePValues(_PValueishTransform):
- def __init__(self, pvalue_cache):
- self._pvalue_cache = pvalue_cache
-
- def visit(self, node):
- if isinstance(node, pvalue.PValue):
- return self._pvalue_cache.get_unwindowed_pvalue(node)
- elif isinstance(node, pvalue.DoOutputsTuple):
- return _MaterializedDoOutputsTuple(node, self._pvalue_cache)
- else:
- return super(_MaterializePValues, self).visit(node)
-
-
-class GetPValues(_PValueishTransform):
- def visit(self, node, pvalues=None):
- if pvalues is None:
- pvalues = []
- self.visit(node, pvalues)
- return pvalues
- elif isinstance(node, (pvalue.PValue, pvalue.DoOutputsTuple)):
- pvalues.append(node)
- else:
- super(GetPValues, self).visit(node, pvalues)
-
-
-class ZipPValues(_PValueishTransform):
- """Pairs each PValue in a pvalueish with a value in a parallel out sibling.
-
- Sibling should have the same nested structure as pvalueish. Leaves in
- sibling are expanded across nested pvalueish lists, tuples, and dicts.
- For example
-
- ZipPValues().visit({'a': pc1, 'b': (pc2, pc3)},
- {'a': 'A', 'b', 'B'})
-
- will return
-
- [('a', pc1, 'A'), ('b', pc2, 'B'), ('b', pc3, 'B')]
- """
-
- def visit(self, pvalueish, sibling, pairs=None, context=None):
- if pairs is None:
- pairs = []
- self.visit(pvalueish, sibling, pairs, context)
- return pairs
- elif isinstance(pvalueish, (pvalue.PValue, pvalue.DoOutputsTuple)):
- pairs.append((context, pvalueish, sibling))
- else:
- super(ZipPValues, self).visit(pvalueish, sibling, pairs, context)
-
- def visit_list(self, pvalueish, sibling, pairs, context):
- if isinstance(sibling, (list, tuple)):
- for ix, (p, s) in enumerate(zip(
- pvalueish, list(sibling) + [None] * len(pvalueish))):
- self.visit(p, s, pairs, 'position %s' % ix)
- else:
- for p in pvalueish:
- self.visit(p, sibling, pairs, context)
-
- def visit_tuple(self, pvalueish, sibling, pairs, context):
- self.visit_list(pvalueish, sibling, pairs, context)
-
- def visit_dict(self, pvalueish, sibling, pairs, context):
- if isinstance(sibling, dict):
- for key, p in pvalueish.items():
- self.visit(p, sibling.get(key), pairs, key)
- else:
- for p in pvalueish.values():
- self.visit(p, sibling, pairs, context)
-
-
-class PTransform(WithTypeHints):
- """A transform object used to modify one or more PCollections.
-
- Subclasses must define an apply() method that will be used when the transform
- is applied to some arguments. Typical usage pattern will be:
-
- input | CustomTransform(...)
-
- The apply() method of the CustomTransform object passed in will be called
- with input as an argument.
- """
- # By default, transforms don't have any side inputs.
- side_inputs = ()
-
- # Used for nullary transforms.
- pipeline = None
-
- # Default is unset.
- _user_label = None
-
- def __init__(self, label=None):
- super(PTransform, self).__init__()
- self.label = label
-
- @property
- def label(self):
- return self._user_label or self.default_label()
-
- @label.setter
- def label(self, value):
- self._user_label = value
-
- def default_label(self):
- return self.__class__.__name__
-
- @classmethod
- def parse_label_and_arg(cls, args, kwargs, arg_name):
- """Parses a tuple of positional arguments into label, arg_name.
-
- The function is used by functions that take a (label, arg_name) list of
- parameters and in which first label could be optional even if the arg_name
- is not passed as a keyword. More specifically the following calling
patterns
- are allowed::
-
- (value)
- ('label', value)
- (arg_name=value)
- ('label', arg_name=value)
- (value, label='label')
- (label='label', arg_name=value)
-
- Args:
- args: A tuple of position arguments.
- kwargs: A dictionary of keyword arguments.
- arg_name: The name of the second ergument.
-
- Returns:
- A (label, value) tuple. The label will be the one passed in or one
- derived from the class name. The value will the corresponding value for
- the arg_name argument.
-
- Raises:
- ValueError: If the label and value cannot be deduced from args and kwargs
- and also if the label is not a string.
- """
- # TODO(robertwb): Fix to not silently drop extra arguments.
- kw_label = kwargs.get('label', None)
- kw_value = kwargs.get(arg_name, None)
-
- if kw_value is not None:
- value = kw_value
- else:
- value = args[1] if len(args) > 1 else args[0] if args else None
-
- if kw_label is not None:
- label = kw_label
- else:
- # We need to get a label from positional arguments. If we did not get a
- # keyword value for the arg_name either then expect that a one element
- # list will provide the value and the label will be derived from the
class
- # name.
- num_args = len(args)
- if kw_value is None:
- label = args[0] if num_args >= 2 else cls.__name__
- else:
- label = args[0] if num_args >= 1 else cls.__name__
-
- if label is None or value is None or not isinstance(label, basestring):
- raise ValueError(
- '%s expects a (label, %s) or (%s) argument list '
- 'instead of args=%s, kwargs=%s' % (
- cls.__name__, arg_name, arg_name, args, kwargs))
- return label, value
-
- def with_input_types(self, input_type_hint):
- """Annotates the input type of a PTransform with a type-hint.
-
- Args:
- input_type_hint: An instance of an allowed built-in type, a custom class,
- or an instance of a typehints.TypeConstraint.
-
- Raises:
- TypeError: If 'type_hint' is not a valid type-hint. See
- typehints.validate_composite_type_param for further details.
-
- Returns:
- A reference to the instance of this particular PTransform object. This
- allows chaining type-hinting related methods.
- """
- validate_composite_type_param(input_type_hint,
- 'Type hints for a PTransform')
- return super(PTransform, self).with_input_types(input_type_hint)
-
- def with_output_types(self, type_hint):
- """Annotates the output type of a PTransform with a type-hint.
-
- Args:
- type_hint: An instance of an allowed built-in type, a custom class, or a
- typehints.TypeConstraint.
-
- Raises:
- TypeError: If 'type_hint' is not a valid type-hint. See
- typehints.validate_composite_type_param for further details.
-
- Returns:
- A reference to the instance of this particular PTransform object. This
- allows chaining type-hinting related methods.
- """
- validate_composite_type_param(type_hint, 'Type hints for a PTransform')
- return super(PTransform, self).with_output_types(type_hint)
-
- def type_check_inputs(self, pvalueish):
- self.type_check_inputs_or_outputs(pvalueish, 'input')
-
- def infer_output_type(self, unused_input_type):
- return self.get_type_hints().simple_output_type(self.label) or
typehints.Any
-
- def type_check_outputs(self, pvalueish):
- self.type_check_inputs_or_outputs(pvalueish, 'output')
-
- def type_check_inputs_or_outputs(self, pvalueish, input_or_output):
- hints = getattr(self.get_type_hints(), input_or_output + '_types')
- if not hints:
- return
- arg_hints, kwarg_hints = hints
- if arg_hints and kwarg_hints:
- raise TypeCheckError(
- 'PTransform cannot have both positional and keyword type hints '
- 'without overriding %s._type_check_%s()' % (
- self.__class__, input_or_output))
- root_hint = (
- arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints)
- for context, pvalue_, hint in ZipPValues().visit(pvalueish, root_hint):
- if pvalue_.element_type is None:
- # TODO(robertwb): It's a bug that we ever get here. (typecheck)
- continue
- if hint and not typehints.is_consistent_with(pvalue_.element_type, hint):
- at_context = ' %s %s' % (input_or_output, context) if context else ''
- raise TypeCheckError(
- '%s type hint violation at %s%s: expected %s, got %s' % (
- input_or_output.title(), self.label, at_context, hint,
- pvalue_.element_type))
-
- def clone(self, new_label):
- """Clones the current transform instance under a new label."""
- transform = copy.copy(self)
- transform.label = new_label
- return transform
-
- def apply(self, input_or_inputs):
- raise NotImplementedError
-
- def __str__(self):
- return '<%s>' % self._str_internal()
-
- def __repr__(self):
- return '<%s at %s>' % (self._str_internal(), hex(id(self)))
-
- def _str_internal(self):
- return '%s(PTransform)%s%s%s' % (
- self.__class__.__name__,
- ' label=[%s]' % self.label if (hasattr(self, 'label') and
- self.label) else '',
- ' inputs=%s' % str(self.inputs) if (hasattr(self, 'inputs') and
- self.inputs) else '',
- ' side_inputs=%s' % str(self.side_inputs) if self.side_inputs else '')
-
- def _check_pcollection(self, pcoll):
- if not isinstance(pcoll, pvalue.PCollection):
- raise error.TransformError('Expecting a PCollection argument.')
- if not pcoll.pipeline:
- raise error.TransformError('PCollection not part of a pipeline.')
-
- def get_windowing(self, inputs):
- """Returns the window function to be associated with transform's output.
-
- By default most transforms just return the windowing function associated
- with the input PCollection (or the first input if several).
- """
- # TODO(robertwb): Assert all input WindowFns compatible.
- return inputs[0].windowing
-
- def __or__(self, right):
- """Used to compose PTransforms, e.g., ptransform1 | ptransform2."""
- if isinstance(right, PTransform):
- return ChainedPTransform(self, right)
- else:
- return NotImplemented
-
- def __ror__(self, left):
- """Used to apply this PTransform to non-PValues, e.g., a tuple."""
- pvalueish, pvalues = self._extract_input_pvalues(left)
- pipelines = [v.pipeline for v in pvalues if isinstance(v, pvalue.PValue)]
- if pvalues and not pipelines:
- deferred = False
- # pylint: disable=g-import-not-at-top
- from google.cloud.dataflow import pipeline
- from google.cloud.dataflow.utils.options import PipelineOptions
- # pylint: enable=g-import-not-at-top
- p = pipeline.Pipeline(
- 'DirectPipelineRunner', PipelineOptions(sys.argv))
- else:
- if not pipelines:
- if self.pipeline is not None:
- p = self.pipeline
- else:
- raise ValueError('"%s" requires a pipeline to be specified '
- 'as there are no deferred inputs.'% self.label)
- else:
- p = self.pipeline or pipelines[0]
- for pp in pipelines:
- if p != pp:
- raise ValueError(
- 'Mixing value from different pipelines not allowed.')
- deferred = not getattr(p.runner, 'is_eager', False)
- # pylint: disable=g-import-not-at-top
- from google.cloud.dataflow.transforms.core import Create
- # pylint: enable=g-import-not-at-top
- replacements = {id(v): p | Create('CreatePInput%s' % ix, v)
- for ix, v in enumerate(pvalues)
- if not isinstance(v, pvalue.PValue) and v is not None}
- pvalueish = _SetInputPValues().visit(pvalueish, replacements)
- self.pipeline = p
- result = p.apply(self, pvalueish)
- if deferred:
- return result
- else:
- # Get a reference to the runners internal cache, otherwise runner may
- # clean it after run.
- cache = p.runner.cache
- p.run()
- return _MaterializePValues(cache).visit(result)
-
- def _extract_input_pvalues(self, pvalueish):
- """Extract all the pvalues contained in the input pvalueish.
-
- Returns pvalueish as well as the flat inputs list as the input may have to
- be copied as inspection may be destructive.
-
- By default, recursively extracts tuple components and dict values.
-
- Generally only needs to be overriden for multi-input PTransforms.
- """
- # pylint: disable=g-import-not-at-top
- from google.cloud.dataflow import pipeline
- # pylint: enable=g-import-not-at-top
- if isinstance(pvalueish, pipeline.Pipeline):
- pvalueish = pvalue.PBegin(pvalueish)
-
- def _dict_tuple_leaves(pvalueish):
- if isinstance(pvalueish, tuple):
- for a in pvalueish:
- for p in _dict_tuple_leaves(a):
- yield p
- elif isinstance(pvalueish, dict):
- for a in pvalueish.values():
- for p in _dict_tuple_leaves(a):
- yield p
- else:
- yield pvalueish
- return pvalueish, tuple(_dict_tuple_leaves(pvalueish))
-
-
-class ChainedPTransform(PTransform):
-
- def __init__(self, *parts):
- super(ChainedPTransform, self).__init__(label=self._chain_label(parts))
- self._parts = parts
-
- def _chain_label(self, parts):
- return '|'.join(p.label for p in parts)
-
- def __or__(self, right):
- if isinstance(right, PTransform):
- # Create a flat list rather than a nested tree of composite
- # transforms for better monitoring, etc.
- return ChainedPTransform(*(self._parts + (right,)))
- else:
- return NotImplemented
-
- def apply(self, pval):
- return reduce(operator.or_, self._parts, pval)
-
-
-class PTransformWithSideInputs(PTransform):
- """A superclass for any PTransform (e.g. FlatMap or Combine)
- invoking user code.
-
- PTransforms like FlatMap invoke user-supplied code in some kind of
- package (e.g. a DoFn) and optionally provide arguments and side inputs
- to that code. This internal-use-only class contains common functionality
- for PTransforms that fit this model.
- """
-
- def __init__(self, fn_or_label, *args, **kwargs):
- if fn_or_label is None or isinstance(fn_or_label, basestring):
- label = fn_or_label
- fn, args = args[0], args[1:]
- else:
- label = None
- fn = fn_or_label
- if isinstance(fn, type) and issubclass(fn, typehints.WithTypeHints):
- # Don't treat Fn class objects as callables.
- raise ValueError('Use %s() not %s.' % (fn.__name__, fn.__name__))
- self.fn = self.make_fn(fn)
- # Now that we figure out the label, initialize the super-class.
- super(PTransformWithSideInputs, self).__init__(label=label)
-
- if (any([isinstance(v, pvalue.PCollection) for v in args]) or
- any([isinstance(v, pvalue.PCollection) for v in kwargs.itervalues()])):
- raise error.SideInputError(
- 'PCollection used directly as side input argument. Specify '
- 'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the
'
- 'PCollection is to be used.')
- self.args, self.kwargs, self.side_inputs = util.remove_objects_from_args(
- args, kwargs, pvalue.PCollectionView)
- self.raw_side_inputs = args, kwargs
-
- # Prevent name collisions with fns of the form '<function <lambda> at ...>'
- self._cached_fn = self.fn
-
- # Ensure fn and side inputs are picklable for remote execution.
- self.fn = pickler.loads(pickler.dumps(self.fn))
- self.args = pickler.loads(pickler.dumps(self.args))
- self.kwargs = pickler.loads(pickler.dumps(self.kwargs))
-
- # For type hints, because loads(dumps(class)) != class.
- self.fn = self._cached_fn
-
- def with_input_types(
- self, input_type_hint, *side_inputs_arg_hints, **side_input_kwarg_hints):
- """Annotates the types of main inputs and side inputs for the PTransform.
-
- Args:
- input_type_hint: An instance of an allowed built-in type, a custom class,
- or an instance of a typehints.TypeConstraint.
- *side_inputs_arg_hints: A variable length argument composed of
- of an allowed built-in type, a custom class, or a
- typehints.TypeConstraint.
- **side_input_kwarg_hints: A dictionary argument composed of
- of an allowed built-in type, a custom class, or a
- typehints.TypeConstraint.
-
- Example of annotating the types of side-inputs:
- FlatMap().with_input_types(int, int, bool)
-
- Raises:
- TypeError: If 'type_hint' is not a valid type-hint. See
- typehints.validate_composite_type_param for further details.
-
- Returns:
- A reference to the instance of this particular PTransform object. This
- allows chaining type-hinting related methods.
- """
- super(PTransformWithSideInputs, self).with_input_types(input_type_hint)
-
- for si in side_inputs_arg_hints:
- validate_composite_type_param(si, 'Type hints for a PTransform')
- for si in side_input_kwarg_hints.values():
- validate_composite_type_param(si, 'Type hints for a PTransform')
-
- self.side_inputs_types = side_inputs_arg_hints
- return WithTypeHints.with_input_types(
- self, input_type_hint, *side_inputs_arg_hints,
**side_input_kwarg_hints)
-
- def type_check_inputs(self, pvalueish):
- type_hints = self.get_type_hints().input_types
- if type_hints:
- args, kwargs = self.raw_side_inputs
- def element_type(side_input):
- if isinstance(side_input, pvalue.PCollectionView):
- return side_input.element_type
- else:
- return instance_to_type(side_input)
- arg_types = [pvalueish.element_type] + [element_type(v) for v in args]
- kwargs_types = {k: element_type(v) for (k, v) in kwargs.items()}
- argspec_fn = self.process_argspec_fn()
- bindings = getcallargs_forhints(argspec_fn, *arg_types, **kwargs_types)
- hints = getcallargs_forhints(argspec_fn, *type_hints[0], **type_hints[1])
- for arg, hint in hints.items():
- if arg.startswith('%unknown%'):
- continue
- if hint is None:
- continue
- if not typehints.is_consistent_with(
- bindings.get(arg, typehints.Any), hint):
- raise typehints.TypeCheckError(
- 'Type hint violation for \'%s\': requires %s but got %s for %s'
- % (self.label, hint, bindings[arg], arg))
-
- def process_argspec_fn(self):
- """Returns an argspec of the function actually consuming the data.
- """
- raise NotImplementedError
-
- def make_fn(self, fn):
- # TODO(silviuc): Add comment describing that this is meant to be overriden
- # by methods detecting callables and wrapping them in DoFns.
- return fn
-
- def default_label(self):
- return '%s(%s)' % (self.__class__.__name__, self.fn.default_label())
-
-
-class CallablePTransform(PTransform):
- """A class wrapper for a function-based transform."""
-
- def __init__(self, fn):
- # pylint: disable=super-init-not-called
- # This is a helper class for a function decorator. Only when the class
- # is called (and __call__ invoked) we will have all the information
- # needed to initialize the super class.
- self.fn = fn
-
- def __call__(self, *args, **kwargs):
- if args and args[0] is None:
- label, self._args = None, args[1:]
- elif args and isinstance(args[0], str):
- label, self._args = args[0], args[1:]
- else:
- label, self._args = None, args
- self._kwargs = kwargs
- # We know the label now, so initialize the super-class.
- super(CallablePTransform, self).__init__(label=label)
- return self
-
- def apply(self, pcoll):
- # Since the PTransform will be implemented entirely as a function
- # (once called), we need to pass through any type-hinting information that
- # may have been annotated via the .with_input_types() and
- # .with_output_types() methods.
- kwargs = dict(self._kwargs)
- args = tuple(self._args)
- try:
- if 'type_hints' in inspect.getargspec(self.fn).args:
- args = (self.get_type_hints(),) + args
- except TypeError:
- # Might not be a function.
- pass
- return self.fn(self.label, pcoll, *args, **kwargs)
-
- def default_label(self):
- if self._args:
- return '%s(%s)' % (
- label_from_callable(self.fn), label_from_callable(self._args[0]))
- else:
- return label_from_callable(self.fn)
-
-
-def ptransform_fn(fn):
- """A decorator for a function-based PTransform.
-
- Args:
- fn: A function implementing a custom PTransform.
-
- Returns:
- A CallablePTransform instance wrapping the function-based PTransform.
-
- This wrapper provides an alternative, simpler way to define a PTransform.
- The standard method is to subclass from PTransform and override the apply()
- method. An equivalent effect can be obtained by defining a function that
- takes a label, an input PCollection and additional optional arguments and
- returns a resulting PCollection. For example::
-
- @ptransform_fn
- def CustomMapper(label, pcoll, mapfn):
- return pcoll | ParDo(mapfn)
-
- The equivalent approach using PTransform subclassing::
-
- class CustomMapper(PTransform):
- def apply(self, pcoll, mapfn):
- return pcoll | ParDo(mapfn)
-
- With either method the custom PTransform can be used in pipelines as if
- it were one of the "native" PTransforms::
-
- result_pcoll = input_pcoll | CustomMapper('label', somefn)
-
- Note that for both solutions the underlying implementation of the pipe
- operator (i.e., `|`) will inject the pcoll argument in its proper place
- (first argument if no label was specified and second argument otherwise).
- """
- return CallablePTransform(fn)
-
-
-def format_full_label(applied_transform, pending_transform):
- """Returns a fully formatted cumulative PTransform label.
-
- Args:
- applied_transform: An instance of an AppliedPTransform that has been fully
- applied prior to 'pending_transform'.
- pending_transform: An instance of PTransform that has yet to be applied to
- the Pipeline.
-
- Returns:
- A fully formatted PTransform label. Example: '/foo/bar/baz'.
- """
- label = '/'.join([applied_transform.full_label, pending_transform.label])
- # Remove leading backslash because the monitoring UI expects names that do
not
- # start with such a character.
- return label if not label.startswith('/') else label[1:]
-
-
-def label_from_callable(fn):
- if hasattr(fn, 'default_label'):
- return fn.default_label()
- elif hasattr(fn, '__name__'):
- if fn.__name__ == '<lambda>':
- return '<lambda at %s:%s>' % (
- os.path.basename(fn.func_code.co_filename),
- fn.func_code.co_firstlineno)
- else:
- return fn.__name__
- else:
- return str(fn)