Repository: beam Updated Branches: refs/heads/master 64102943f -> 00f5fefc8
Remove fn_or_label argument from Map and Filter Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fa5270ff Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fa5270ff Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fa5270ff Branch: refs/heads/master Commit: fa5270ffffe8391eb97ccd435ea0f6ca7e8788c8 Parents: 6410294 Author: Sourabh Bajaj <[email protected]> Authored: Wed Feb 8 11:22:42 2017 -0800 Committer: Ahmet Altay <[email protected]> Committed: Wed Feb 8 16:58:26 2017 -0800 ---------------------------------------------------------------------- .../apache_beam/examples/snippets/snippets.py | 4 +-- sdks/python/apache_beam/transforms/combiners.py | 2 +- sdks/python/apache_beam/transforms/core.py | 34 ++++++-------------- .../apache_beam/transforms/ptransform_test.py | 4 +++ sdks/python/apache_beam/transforms/util.py | 8 ++--- 5 files changed, 21 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fa5270ff/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index c3879dc..3216de4 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -1129,8 +1129,8 @@ def model_join_using_side_inputs( '%s' % ','.join(filtered_emails), '%s' % ','.join(filtered_phone_numbers)]) - contact_lines = names | beam.core.Map( - "CreateContacts", join_info, AsIter(emails), AsIter(phones)) + contact_lines = names | 'CreateContacts' >> beam.core.Map( + join_info, AsIter(emails), AsIter(phones)) # [END model_join_using_side_inputs] contact_lines | beam.io.WriteToText(output_path) p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/fa5270ff/sdks/python/apache_beam/transforms/combiners.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index 96fcddd..d4874b7 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -120,7 +120,7 @@ class Count(object): def expand(self, pcoll): paired_with_void_type = KV[pcoll.element_type, Any] return (pcoll - | (core.Map('%s:PairWithVoid' % self.label, lambda x: (x, None)) + | ('%s:PairWithVoid' % self.label >> core.Map(lambda x: (x, None)) .with_output_types(paired_with_void_type)) | core.CombinePerKey(CountCombineFn())) http://git-wip-us.apache.org/repos/asf/beam/blob/fa5270ff/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index f39b17f..7cbca2f 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -704,12 +704,11 @@ def FlatMap(fn_or_label, *args, **kwargs): # pylint: disable=invalid-name return ParDo(label, CallableWrapperDoFn(fn), *args, **kwargs) -def Map(fn_or_label, *args, **kwargs): # pylint: disable=invalid-name +def Map(fn, *args, **kwargs): # pylint: disable=invalid-name """Map is like FlatMap except its callable returns only a single element. Args: - fn_or_label: name of this transform instance. Useful while monitoring and - debugging a pipeline execution. + fn: a callable object. *args: positional arguments passed to the transform callable. **kwargs: keyword arguments passed to the transform callable. @@ -720,20 +719,17 @@ def Map(fn_or_label, *args, **kwargs): # pylint: disable=invalid-name TypeError: If the fn passed as argument is not a callable. Typical error is to pass a DoFn instance which is supported only for ParDo. """ - if isinstance(fn_or_label, str): - label, fn, args = fn_or_label, args[0], args[1:] - else: - label, fn = None, fn_or_label if not callable(fn): raise TypeError( 'Map can be used only with callable objects. ' - 'Received %r instead for %s argument.' - % (fn, 'first' if label is None else 'second')) + 'Received %r instead.' % (fn)) if _fn_takes_side_inputs(fn): wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] else: wrapper = lambda x: [fn(x)] + label = 'Map(%s)' % ptransform.label_from_callable(fn) + # TODO. What about callable classes? if hasattr(fn, '__name__'): wrapper.__name__ = fn.__name__ @@ -748,18 +744,14 @@ def Map(fn_or_label, *args, **kwargs): # pylint: disable=invalid-name wrapper._argspec_fn = fn # pylint: enable=protected-access - if label is None: - label = 'Map(%s)' % ptransform.label_from_callable(fn) - return FlatMap(label, wrapper, *args, **kwargs) -def Filter(fn_or_label, *args, **kwargs): # pylint: disable=invalid-name +def Filter(fn, *args, **kwargs): # pylint: disable=invalid-name """Filter is a FlatMap with its callable filtering out elements. Args: - fn_or_label: name of this transform instance. Useful while monitoring and - debugging a pipeline execution. + fn: a callable object. *args: positional arguments passed to the transform callable. **kwargs: keyword arguments passed to the transform callable. @@ -770,17 +762,14 @@ def Filter(fn_or_label, *args, **kwargs): # pylint: disable=invalid-name TypeError: If the fn passed as argument is not a callable. Typical error is to pass a DoFn instance which is supported only for FlatMap. """ - if isinstance(fn_or_label, str): - label, fn, args = fn_or_label, args[0], args[1:] - else: - label, fn = None, fn_or_label if not callable(fn): raise TypeError( 'Filter can be used only with callable objects. ' - 'Received %r instead for %s argument.' - % (fn, 'first' if label is None else 'second')) + 'Received %r instead.' % (fn)) wrapper = lambda x, *args, **kwargs: [x] if fn(x, *args, **kwargs) else [] + label = 'Filter(%s)' % ptransform.label_from_callable(fn) + # TODO: What about callable classes? if hasattr(fn, '__name__'): wrapper.__name__ = fn.__name__ @@ -798,9 +787,6 @@ def Filter(fn_or_label, *args, **kwargs): # pylint: disable=invalid-name wrapper._argspec_fn = fn # pylint: enable=protected-access - if label is None: - label = 'Filter(%s)' % ptransform.label_from_callable(fn) - return FlatMap(label, wrapper, *args, **kwargs) http://git-wip-us.apache.org/repos/asf/beam/blob/fa5270ff/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 9da692c..c83432c 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -727,6 +727,10 @@ class PTransformLabelsTest(unittest.TestCase): self.check_label(beam.ParDo(MyDoFn()), r'ParDo(MyDoFn)') + def test_lable_propogation(self): + self.check_label('TestMap' >> beam.Map(len), r'TestMap') + self.check_label('TestFilter' >> beam.Filter(len), r'TestFilter') + class PTransformTestDisplayData(unittest.TestCase): def test_map_named_function(self): http://git-wip-us.apache.org/repos/asf/beam/blob/fa5270ff/sdks/python/apache_beam/transforms/util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index ad63a02..1f691ca 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -138,7 +138,7 @@ class CoGroupByKey(PTransform): if self.pipeline: assert pcoll.pipeline == self.pipeline - return ([pcoll | Map('pair_with_%s' % tag, _pair_tag_with_value, tag) + return ([pcoll | 'pair_with_%s' % tag >> Map(_pair_tag_with_value, tag) for tag, pcoll in pcolls] | Flatten(pipeline=self.pipeline) | GroupByKey() @@ -147,17 +147,17 @@ class CoGroupByKey(PTransform): def Keys(label='Keys'): # pylint: disable=invalid-name """Produces a PCollection of first elements of 2-tuples in a PCollection.""" - return Map(label, lambda (k, v): k) + return label >> Map(lambda (k, v): k) def Values(label='Values'): # pylint: disable=invalid-name """Produces a PCollection of second elements of 2-tuples in a PCollection.""" - return Map(label, lambda (k, v): v) + return label >> Map(lambda (k, v): v) def KvSwap(label='KvSwap'): # pylint: disable=invalid-name """Produces a PCollection reversing 2-tuples in a PCollection.""" - return Map(label, lambda (k, v): (v, k)) + return label >> Map(lambda (k, v): (v, k)) @ptransform_fn
