Remove label from Flatten

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5b0d8830
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5b0d8830
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5b0d8830

Branch: refs/heads/master
Commit: 5b0d883033d5544deb169158e79b26c188153cb5
Parents: dec7edf
Author: Sourabh Bajaj <sourabhba...@google.com>
Authored: Fri Feb 10 11:48:14 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Feb 10 16:59:50 2017 -0800

----------------------------------------------------------------------
 .../runners/direct/consumer_tracking_pipeline_visitor_test.py  | 4 ++--
 sdks/python/apache_beam/transforms/core.py                     | 6 ++----
 sdks/python/apache_beam/transforms/ptransform_test.py          | 2 +-
 3 files changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5b0d8830/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
 
b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
index 51dd1fa..126a7de 100644
--- 
a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
+++ 
b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -52,13 +52,13 @@ class 
ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
       pass
 
     root_read = Read('read', DummySource())
-    root_flatten = Flatten('flatten', pipeline=self.pipeline)
+    root_flatten = Flatten(pipeline=self.pipeline)
 
     pbegin = pvalue.PBegin(self.pipeline)
     pcoll_create = pbegin | 'create' >> root_create
     pbegin | root_read
     pcoll_create | FlatMap(lambda x: x)
-    [] | root_flatten
+    [] | 'flatten' >> root_flatten
 
     self.pipeline.visit(self.visitor)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5b0d8830/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 9da7cf2..2557b7e 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1283,8 +1283,6 @@ class Flatten(PTransform):
   will be empty (but see also kwargs below).
 
   Args:
-    label: name of this transform instance. Useful while monitoring and
-      debugging a pipeline execution.
     **kwargs: Accepts a single named argument "pipeline", which specifies the
       pipeline that "owns" this PTransform. Ordinarily Flatten can obtain this
       information from one of the input PCollections, but if there are none (or
@@ -1292,8 +1290,8 @@ class Flatten(PTransform):
       provide pipeline information and should be considered mandatory.
   """
 
-  def __init__(self, label=None, **kwargs):
-    super(Flatten, self).__init__(label)
+  def __init__(self, **kwargs):
+    super(Flatten, self).__init__()
     self.pipeline = kwargs.pop('pipeline', None)
     if kwargs:
       raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys())

http://git-wip-us.apache.org/repos/asf/beam/blob/5b0d8830/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py 
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 0606645..f7f157f 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -585,7 +585,7 @@ class PTransformTest(unittest.TestCase):
         keys = reduce(operator.or_, [set(p.keys()) for p in pcoll_dicts])
         res = {}
         for k in keys:
-          res[k] = [p[k] for p in pcoll_dicts if k in p] | beam.Flatten(k)
+          res[k] = [p[k] for p in pcoll_dicts if k in p] | k >> beam.Flatten()
         return res
     res = [{'a': [1, 2, 3]},
            {'a': [4, 5, 6], 'b': ['x', 'y', 'z']},

Reply via email to