Repository: beam
Updated Branches:
  refs/heads/master 64102943f -> 00f5fefc8


Remove fn_or_label argument from Map and Filter


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fa5270ff
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fa5270ff
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fa5270ff

Branch: refs/heads/master
Commit: fa5270ffffe8391eb97ccd435ea0f6ca7e8788c8
Parents: 6410294
Author: Sourabh Bajaj <[email protected]>
Authored: Wed Feb 8 11:22:42 2017 -0800
Committer: Ahmet Altay <[email protected]>
Committed: Wed Feb 8 16:58:26 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/snippets.py   |  4 +--
 sdks/python/apache_beam/transforms/combiners.py |  2 +-
 sdks/python/apache_beam/transforms/core.py      | 34 ++++++--------------
 .../apache_beam/transforms/ptransform_test.py   |  4 +++
 sdks/python/apache_beam/transforms/util.py      |  8 ++---
 5 files changed, 21 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fa5270ff/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index c3879dc..3216de4 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1129,8 +1129,8 @@ def model_join_using_side_inputs(
                       '%s' % ','.join(filtered_emails),
                       '%s' % ','.join(filtered_phone_numbers)])
 
-  contact_lines = names | beam.core.Map(
-      "CreateContacts", join_info, AsIter(emails), AsIter(phones))
+  contact_lines = names | 'CreateContacts' >> beam.core.Map(
+      join_info, AsIter(emails), AsIter(phones))
   # [END model_join_using_side_inputs]
   contact_lines | beam.io.WriteToText(output_path)
   p.run()

http://git-wip-us.apache.org/repos/asf/beam/blob/fa5270ff/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py 
b/sdks/python/apache_beam/transforms/combiners.py
index 96fcddd..d4874b7 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -120,7 +120,7 @@ class Count(object):
     def expand(self, pcoll):
       paired_with_void_type = KV[pcoll.element_type, Any]
       return (pcoll
-              | (core.Map('%s:PairWithVoid' % self.label, lambda x: (x, None))
+              | ('%s:PairWithVoid' % self.label >> core.Map(lambda x: (x, 
None))
                  .with_output_types(paired_with_void_type))
               | core.CombinePerKey(CountCombineFn()))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/fa5270ff/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index f39b17f..7cbca2f 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -704,12 +704,11 @@ def FlatMap(fn_or_label, *args, **kwargs):  # pylint: 
disable=invalid-name
   return ParDo(label, CallableWrapperDoFn(fn), *args, **kwargs)
 
 
-def Map(fn_or_label, *args, **kwargs):  # pylint: disable=invalid-name
+def Map(fn, *args, **kwargs):  # pylint: disable=invalid-name
   """Map is like FlatMap except its callable returns only a single element.
 
   Args:
-    fn_or_label: name of this transform instance. Useful while monitoring and
-      debugging a pipeline execution.
+    fn: a callable object.
     *args: positional arguments passed to the transform callable.
     **kwargs: keyword arguments passed to the transform callable.
 
@@ -720,20 +719,17 @@ def Map(fn_or_label, *args, **kwargs):  # pylint: 
disable=invalid-name
     TypeError: If the fn passed as argument is not a callable. Typical error
       is to pass a DoFn instance which is supported only for ParDo.
   """
-  if isinstance(fn_or_label, str):
-    label, fn, args = fn_or_label, args[0], args[1:]
-  else:
-    label, fn = None, fn_or_label
   if not callable(fn):
     raise TypeError(
         'Map can be used only with callable objects. '
-        'Received %r instead for %s argument.'
-        % (fn, 'first' if label is None else 'second'))
+        'Received %r instead.' % (fn))
   if _fn_takes_side_inputs(fn):
     wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
   else:
     wrapper = lambda x: [fn(x)]
 
+  label = 'Map(%s)' % ptransform.label_from_callable(fn)
+
   # TODO. What about callable classes?
   if hasattr(fn, '__name__'):
     wrapper.__name__ = fn.__name__
@@ -748,18 +744,14 @@ def Map(fn_or_label, *args, **kwargs):  # pylint: 
disable=invalid-name
   wrapper._argspec_fn = fn
   # pylint: enable=protected-access
 
-  if label is None:
-    label = 'Map(%s)' % ptransform.label_from_callable(fn)
-
   return FlatMap(label, wrapper, *args, **kwargs)
 
 
-def Filter(fn_or_label, *args, **kwargs):  # pylint: disable=invalid-name
+def Filter(fn, *args, **kwargs):  # pylint: disable=invalid-name
   """Filter is a FlatMap with its callable filtering out elements.
 
   Args:
-    fn_or_label: name of this transform instance. Useful while monitoring and
-      debugging a pipeline execution.
+    fn: a callable object.
     *args: positional arguments passed to the transform callable.
     **kwargs: keyword arguments passed to the transform callable.
 
@@ -770,17 +762,14 @@ def Filter(fn_or_label, *args, **kwargs):  # pylint: 
disable=invalid-name
     TypeError: If the fn passed as argument is not a callable. Typical error
       is to pass a DoFn instance which is supported only for FlatMap.
   """
-  if isinstance(fn_or_label, str):
-    label, fn, args = fn_or_label, args[0], args[1:]
-  else:
-    label, fn = None, fn_or_label
   if not callable(fn):
     raise TypeError(
         'Filter can be used only with callable objects. '
-        'Received %r instead for %s argument.'
-        % (fn, 'first' if label is None else 'second'))
+        'Received %r instead.' % (fn))
   wrapper = lambda x, *args, **kwargs: [x] if fn(x, *args, **kwargs) else []
 
+  label = 'Filter(%s)' % ptransform.label_from_callable(fn)
+
   # TODO: What about callable classes?
   if hasattr(fn, '__name__'):
     wrapper.__name__ = fn.__name__
@@ -798,9 +787,6 @@ def Filter(fn_or_label, *args, **kwargs):  # pylint: 
disable=invalid-name
   wrapper._argspec_fn = fn
   # pylint: enable=protected-access
 
-  if label is None:
-    label = 'Filter(%s)' % ptransform.label_from_callable(fn)
-
   return FlatMap(label, wrapper, *args, **kwargs)
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/fa5270ff/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py 
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 9da692c..c83432c 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -727,6 +727,10 @@ class PTransformLabelsTest(unittest.TestCase):
 
     self.check_label(beam.ParDo(MyDoFn()), r'ParDo(MyDoFn)')
 
+  def test_lable_propogation(self):
+    self.check_label('TestMap' >> beam.Map(len), r'TestMap')
+    self.check_label('TestFilter' >> beam.Filter(len), r'TestFilter')
+
 
 class PTransformTestDisplayData(unittest.TestCase):
   def test_map_named_function(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/fa5270ff/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py 
b/sdks/python/apache_beam/transforms/util.py
index ad63a02..1f691ca 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -138,7 +138,7 @@ class CoGroupByKey(PTransform):
       if self.pipeline:
         assert pcoll.pipeline == self.pipeline
 
-    return ([pcoll | Map('pair_with_%s' % tag, _pair_tag_with_value, tag)
+    return ([pcoll | 'pair_with_%s' % tag >> Map(_pair_tag_with_value, tag)
              for tag, pcoll in pcolls]
             | Flatten(pipeline=self.pipeline)
             | GroupByKey()
@@ -147,17 +147,17 @@ class CoGroupByKey(PTransform):
 
 def Keys(label='Keys'):  # pylint: disable=invalid-name
   """Produces a PCollection of first elements of 2-tuples in a PCollection."""
-  return Map(label, lambda (k, v): k)
+  return label >> Map(lambda (k, v): k)
 
 
 def Values(label='Values'):  # pylint: disable=invalid-name
   """Produces a PCollection of second elements of 2-tuples in a PCollection."""
-  return Map(label, lambda (k, v): v)
+  return label >> Map(lambda (k, v): v)
 
 
 def KvSwap(label='KvSwap'):  # pylint: disable=invalid-name
   """Produces a PCollection reversing 2-tuples in a PCollection."""
-  return Map(label, lambda (k, v): (v, k))
+  return label >> Map(lambda (k, v): (v, k))
 
 
 @ptransform_fn

Reply via email to