Remove label from windowInto

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/44d6501d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/44d6501d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/44d6501d

Branch: refs/heads/master
Commit: 44d6501dc73e1aa46a5b1138e92c8aa0a8bfe3f5
Parents: 5b0d883
Author: Sourabh Bajaj <sourabhba...@google.com>
Authored: Fri Feb 10 11:56:50 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Feb 10 16:59:50 2017 -0800

----------------------------------------------------------------------
 .../examples/cookbook/bigquery_schema.py        |  3 +-
 .../examples/cookbook/bigquery_tornadoes.py     |  3 +-
 .../apache_beam/examples/snippets/snippets.py   | 21 +++----
 .../apache_beam/examples/streaming_wordcount.py |  8 +--
 sdks/python/apache_beam/io/iobase.py            | 20 ++-----
 sdks/python/apache_beam/io/textio_test.py       |  4 +-
 sdks/python/apache_beam/pvalue.py               | 25 --------
 sdks/python/apache_beam/pvalue_test.py          |  6 +-
 .../runners/dataflow/native_io/iobase.py        | 10 +---
 .../consumer_tracking_pipeline_visitor_test.py  |  9 +--
 sdks/python/apache_beam/transforms/core.py      | 16 ++---
 .../python/apache_beam/transforms/ptransform.py | 61 +-------------------
 .../apache_beam/transforms/sideinputs_test.py   |  4 +-
 sdks/python/apache_beam/transforms/util.py      |  4 +-
 14 files changed, 42 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py 
b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
index 650a886..98aea05 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
@@ -113,8 +113,7 @@ def run(argv=None):
   # pylint: disable=expression-not-assigned
   record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5'])
   records = record_ids | 'CreateRecords' >> beam.Map(create_random_record)
-  records | beam.io.Write(
-      'write',
+  records | 'write' >> beam.io.Write(
       beam.io.BigQuerySink(
           known_args.output,
           schema=table_schema,

http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py 
b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index 6e1326c..6d79216 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -82,8 +82,7 @@ def run(argv=None):
 
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned
-  counts | beam.io.Write(
-      'write',
+  counts | 'write' >> beam.io.Write(
       beam.io.BigQuerySink(
           known_args.output,
           schema='month:INTEGER, tornado_count:INTEGER',

http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index f15a089..9ba46cd 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -793,8 +793,8 @@ def model_custom_sink(simplekv, KVs, 
final_table_name_no_ptransform,
   # [START model_custom_sink_new_ptransform]
   class WriteToKVSink(PTransform):
 
-    def __init__(self, label, url, final_table_name, **kwargs):
-      super(WriteToKVSink, self).__init__(label, **kwargs)
+    def __init__(self, url, final_table_name, **kwargs):
+      super(WriteToKVSink, self).__init__(**kwargs)
       self._url = url
       self._final_table_name = final_table_name
 
@@ -808,8 +808,8 @@ def model_custom_sink(simplekv, KVs, 
final_table_name_no_ptransform,
   # [START model_custom_sink_use_ptransform]
   p = beam.Pipeline(options=PipelineOptions())
   kvs = p | 'CreateKVs' >> beam.core.Create(KVs)
-  kvs | WriteToKVSink('WriteToSimpleKV',
-                      'http://url_to_simple_kv/', final_table_name)
+  kvs | 'WriteToSimpleKV' >> WriteToKVSink(
+      'http://url_to_simple_kv/', final_table_name)
   # [END model_custom_sink_use_ptransform]
 
   p.run().wait_until_finish()
@@ -903,24 +903,21 @@ def model_bigqueryio():
 
   # [START model_bigqueryio_read]
   p = beam.Pipeline(options=PipelineOptions())
-  weather_data = p | beam.io.Read(
-      'ReadWeatherStations',
+  weather_data = p | 'ReadWeatherStations' >> beam.io.Read(
       beam.io.BigQuerySource(
           'clouddataflow-readonly:samples.weather_stations'))
   # [END model_bigqueryio_read]
 
   # [START model_bigqueryio_query]
   p = beam.Pipeline(options=PipelineOptions())
-  weather_data = p | beam.io.Read(
-      'ReadYearAndTemp',
+  weather_data = p | 'ReadYearAndTemp' >> beam.io.Read(
       beam.io.BigQuerySource(
           query='SELECT year, mean_temp FROM samples.weather_stations'))
   # [END model_bigqueryio_query]
 
   # [START model_bigqueryio_query_standard_sql]
   p = beam.Pipeline(options=PipelineOptions())
-  weather_data = p | beam.io.Read(
-      'ReadYearAndTemp',
+  weather_data = p | 'ReadYearAndTemp' >> beam.io.Read(
       beam.io.BigQuerySource(
           query='SELECT year, mean_temp FROM `samples.weather_stations`',
           use_standard_sql=True))
@@ -933,8 +930,8 @@ def model_bigqueryio():
   # [START model_bigqueryio_write]
   quotes = p | beam.Create(
       [{'source': 'Mahatma Ghandi', 'quote': 'My life is my message.'}])
-  quotes | beam.io.Write(
-      'Write', beam.io.BigQuerySink(
+  quotes | 'Write' >> beam.io.Write(
+      beam.io.BigQuerySink(
           'my-project:output.output_table',
           schema=schema,
           write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,

http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/examples/streaming_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py 
b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 7fb2c81..4b6aecc 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -47,8 +47,8 @@ def run(argv=None):
   p = beam.Pipeline(argv=pipeline_args)
 
   # Read the text file[pattern] into a PCollection.
-  lines = p | beam.io.Read(
-      'read', beam.io.PubSubSource(known_args.input_topic))
+  lines = p | 'read' >> beam.io.Read(
+      beam.io.PubSubSource(known_args.input_topic))
 
   # Capitalize the characters in each line.
   transformed = (lines
@@ -63,8 +63,8 @@ def run(argv=None):
 
   # Write to PubSub.
   # pylint: disable=expression-not-assigned
-  transformed | beam.io.Write(
-      'pubsub_write', beam.io.PubSubSink(known_args.output_topic))
+  transformed | 'pubsub_write' >> beam.io.Write(
+      beam.io.PubSubSink(known_args.output_topic))
 
   p.run().wait_until_finish()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py 
b/sdks/python/apache_beam/io/iobase.py
index 3a4aec9..a2153f4 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -645,17 +645,13 @@ class Writer(object):
 class Read(ptransform.PTransform):
   """A transform that reads a PCollection."""
 
-  def __init__(self, *args, **kwargs):
+  def __init__(self, source):
     """Initializes a Read transform.
 
     Args:
-      *args: A tuple of position arguments.
-      **kwargs: A dictionary of keyword arguments.
-
-    The *args, **kwargs are expected to be (label, source) or (source).
+      source: Data source to read from.
     """
-    label, source = self.parse_label_and_arg(args, kwargs, 'source')
-    super(Read, self).__init__(label)
+    super(Read, self).__init__()
     self.source = source
 
   def expand(self, pbegin):
@@ -706,17 +702,13 @@ class Write(ptransform.PTransform):
   native write transform.
   """
 
-  def __init__(self, *args, **kwargs):
+  def __init__(self, sink):
     """Initializes a Write transform.
 
     Args:
-      *args: A tuple of position arguments.
-      **kwargs: A dictionary of keyword arguments.
-
-    The *args, **kwargs are expected to be (label, sink) or (sink).
+      sink: Data sink to write to.
     """
-    label, sink = self.parse_label_and_arg(args, kwargs, 'sink')
-    super(Write, self).__init__(label)
+    super(Write, self).__init__()
     self.sink = sink
 
   def display_data(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py 
b/sdks/python/apache_beam/io/textio_test.py
index ea417b0..0dedc95 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -668,7 +668,7 @@ class TextSinkTest(_TestCaseWithTempDirCleanUp):
 
   def test_write_dataflow_auto_compression_unsharded(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | beam.core.Create('Create', self.lines)
+    pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
     pcoll | 'Write' >> WriteToText(  # pylint: disable=expression-not-assigned
         self.path + '.gz',
         shard_name_template='')
@@ -684,7 +684,7 @@ class TextSinkTest(_TestCaseWithTempDirCleanUp):
 
   def test_write_dataflow_header(self):
     pipeline = TestPipeline()
-    pcoll = pipeline | beam.core.Create('Create', self.lines)
+    pcoll = pipeline | 'Create' >> beam.core.Create(self.lines)
     header_text = 'foo'
     pcoll | 'Write' >> WriteToText(  # pylint: disable=expression-not-assigned
         self.path + '.gz',

http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/pvalue.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue.py 
b/sdks/python/apache_beam/pvalue.py
index c87728c..6dc67b0 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -310,27 +310,6 @@ def _cache_view(pipeline, key, view):
   pipeline._view_cache[key] = view  # pylint: disable=protected-access
 
 
-def can_take_label_as_first_argument(callee):
-  """Decorator to allow the "label" kwarg to be passed as the first argument.
-
-  For example, since AsSingleton is annotated with this decorator, this allows
-  the call "AsSingleton(pcoll, label='label1')" to be written more succinctly
-  as "AsSingleton('label1', pcoll)".
-
-  Args:
-    callee: The callable to be called with an optional label argument.
-
-  Returns:
-    Callable that allows (but does not require) a string label as its first
-    argument.
-  """
-  def _inner(maybe_label, *args, **kwargs):
-    if isinstance(maybe_label, basestring):
-      return callee(*args, label=maybe_label, **kwargs)
-    return callee(*((maybe_label,) + args), **kwargs)
-  return _inner
-
-
 def _format_view_label(pcoll):
   # The monitoring UI doesn't like '/' character in transform labels.
   if not pcoll.producer:
@@ -342,7 +321,6 @@ def _format_view_label(pcoll):
 _SINGLETON_NO_DEFAULT = object()
 
 
-@can_take_label_as_first_argument
 def AsSingleton(pcoll, default_value=_SINGLETON_NO_DEFAULT, label=None):  # 
pylint: disable=invalid-name
   """Create a SingletonPCollectionView from the contents of input PCollection.
 
@@ -386,7 +364,6 @@ def AsSingleton(pcoll, default_value=_SINGLETON_NO_DEFAULT, 
label=None):  # pyli
   return view
 
 
-@can_take_label_as_first_argument
 def AsIter(pcoll, label=None):  # pylint: disable=invalid-name
   """Create an IterablePCollectionView from the elements of input PCollection.
 
@@ -417,7 +394,6 @@ def AsIter(pcoll, label=None):  # pylint: 
disable=invalid-name
   return view
 
 
-@can_take_label_as_first_argument
 def AsList(pcoll, label=None):  # pylint: disable=invalid-name
   """Create a ListPCollectionView from the elements of input PCollection.
 
@@ -448,7 +424,6 @@ def AsList(pcoll, label=None):  # pylint: 
disable=invalid-name
   return view
 
 
-@can_take_label_as_first_argument
 def AsDict(pcoll, label=None):  # pylint: disable=invalid-name
   """Create a DictPCollectionView from the elements of input PCollection.
 

http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/pvalue_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue_test.py 
b/sdks/python/apache_beam/pvalue_test.py
index 6748f17..6b4b663 100644
--- a/sdks/python/apache_beam/pvalue_test.py
+++ b/sdks/python/apache_beam/pvalue_test.py
@@ -49,10 +49,10 @@ class PValueTest(unittest.TestCase):
     value2 = pipeline | 'create2' >> Create([(1, 1), (2, 2), (3, 3)])
     value3 = pipeline | 'create3' >> Create([(1, 1), (2, 2), (3, 3)])
     self.assertEqual(AsSingleton(value), AsSingleton(value))
-    self.assertEqual(AsSingleton('new', value, default_value=1),
-                     AsSingleton('new', value, default_value=1))
+    self.assertEqual(AsSingleton(value, default_value=1, label='new'),
+                     AsSingleton(value, default_value=1, label='new'))
     self.assertNotEqual(AsSingleton(value),
-                        AsSingleton('new', value, default_value=1))
+                        AsSingleton(value, default_value=1, label='new'))
     self.assertEqual(AsIter(value), AsIter(value))
     self.assertEqual(AsList(value), AsList(value))
     self.assertEqual(AsDict(value2), AsDict(value2))

http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py 
b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
index b6eb288..529d414 100644
--- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
@@ -293,17 +293,13 @@ class _NativeWrite(ptransform.PTransform):
   Applying this transform results in a ``pvalue.PDone``.
   """
 
-  def __init__(self, *args, **kwargs):
+  def __init__(self, sink):
     """Initializes a Write transform.
 
     Args:
-      *args: A tuple of position arguments.
-      **kwargs: A dictionary of keyword arguments.
-
-    The *args, **kwargs are expected to be (label, sink) or (sink).
+      sink: Sink to use for the write
     """
-    label, sink = self.parse_label_and_arg(args, kwargs, 'sink')
-    super(_NativeWrite, self).__init__(label)
+    super(_NativeWrite, self).__init__()
     self.sink = sink
 
   def expand(self, pcoll):

http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
 
b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
index 126a7de..73b897f 100644
--- 
a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
+++ 
b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -51,12 +51,12 @@ class 
ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
     class DummySource(iobase.BoundedSource):
       pass
 
-    root_read = Read('read', DummySource())
+    root_read = Read(DummySource())
     root_flatten = Flatten(pipeline=self.pipeline)
 
     pbegin = pvalue.PBegin(self.pipeline)
     pcoll_create = pbegin | 'create' >> root_create
-    pbegin | root_read
+    pbegin | 'read' >> root_read
     pcoll_create | FlatMap(lambda x: x)
     [] | 'flatten' >> root_flatten
 
@@ -64,10 +64,7 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase):
 
     root_transforms = sorted(
         [t.transform for t in self.visitor.root_transforms])
-    print root_transforms
-    print root_read
-    print root_create
-    print root_flatten
+
     self.assertEqual(root_transforms, sorted(
         [root_read, root_create, root_flatten]))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 2557b7e..2efe38d 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -578,7 +578,7 @@ class ParDo(PTransformWithSideInputs):
 
   Args:
       pcoll: a PCollection to be processed.
-      dofn: a DoFn object to be applied to each element of pcoll argument.
+      fn: a DoFn object to be applied to each element of pcoll argument.
       *args: positional arguments passed to the dofn object.
       **kwargs:  keyword arguments passed to the dofn object.
 
@@ -1126,8 +1126,8 @@ class GroupByKey(PTransform):
 class GroupByKeyOnly(PTransform):
   """A group by key transform, ignoring windows."""
 
-  def __init__(self, label=None):
-    super(GroupByKeyOnly, self).__init__(label)
+  def __init__(self):
+    super(GroupByKeyOnly, self).__init__()
 
   def infer_output_type(self, input_type):
     key_type, value_type = trivial_inference.key_value_types(input_type)
@@ -1235,25 +1235,19 @@ class WindowInto(ParDo):
       new_windows = self.windowing.windowfn.assign(context)
       yield WindowedValue(element, context.timestamp, new_windows)
 
-  def __init__(self, *args, **kwargs):
+  def __init__(self, windowfn, *args, **kwargs):
     """Initializes a WindowInto transform.
 
     Args:
-      *args: A tuple of position arguments.
-      **kwargs: A dictionary of keyword arguments.
-
-    The *args, **kwargs are expected to be (label, windowfn) or (windowfn).
-    The optional trigger and accumulation_mode kwargs may also be provided.
+      windowfn: Function to be used for windowing
     """
     triggerfn = kwargs.pop('trigger', None)
     accumulation_mode = kwargs.pop('accumulation_mode', None)
     output_time_fn = kwargs.pop('output_time_fn', None)
-    label, windowfn = self.parse_label_and_arg(args, kwargs, 'windowfn')
     self.windowing = Windowing(windowfn, triggerfn, accumulation_mode,
                                output_time_fn)
     dofn = self.WindowIntoFn(self.windowing)
     super(WindowInto, self).__init__(dofn)
-    self.label = label
 
   def get_windowing(self, unused_inputs):
     return self.windowing

http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py 
b/sdks/python/apache_beam/transforms/ptransform.py
index 994c09b..aca5822 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -204,65 +204,6 @@ class PTransform(WithTypeHints, HasDisplayData):
   def default_label(self):
     return self.__class__.__name__
 
-  @classmethod
-  def parse_label_and_arg(cls, args, kwargs, arg_name):
-    """Parses a tuple of positional arguments into label, arg_name.
-
-    The function is used by functions that take a (label, arg_name) list of
-    parameters and in which first label could be optional even if the arg_name
-    is not passed as a keyword. More specifically the following calling 
patterns
-    are allowed::
-
-      (value)
-      ('label', value)
-      (arg_name=value)
-      ('label', arg_name=value)
-      (value, label='label')
-      (label='label', arg_name=value)
-
-    Args:
-      args: A tuple of position arguments.
-      kwargs: A dictionary of keyword arguments.
-      arg_name: The name of the second argument.
-
-    Returns:
-      A (label, value) tuple. The label will be the one passed in or one
-      derived from the class name. The value will the corresponding value for
-      the arg_name argument.
-
-    Raises:
-      ValueError: If the label and value cannot be deduced from args and kwargs
-        and also if the label is not a string.
-    """
-    # TODO(robertwb): Fix to not silently drop extra arguments.
-    kw_label = kwargs.get('label', None)
-    kw_value = kwargs.get(arg_name, None)
-
-    if kw_value is not None:
-      value = kw_value
-    else:
-      value = args[1] if len(args) > 1 else args[0] if args else None
-
-    if kw_label is not None:
-      label = kw_label
-    else:
-      # We need to get a label from positional arguments. If we did not get a
-      # keyword value for the arg_name either then expect that a one element
-      # list will provide the value and the label will be derived from the 
class
-      # name.
-      num_args = len(args)
-      if kw_value is None:
-        label = args[0] if num_args >= 2 else cls.__name__
-      else:
-        label = args[0] if num_args >= 1 else cls.__name__
-
-    if label is None or value is None or not isinstance(label, basestring):
-      raise ValueError(
-          '%s expects a (label, %s) or (%s) argument list '
-          'instead of args=%s, kwargs=%s' % (
-              cls.__name__, arg_name, arg_name, args, kwargs))
-    return label, value
-
   def with_input_types(self, input_type_hint):
     """Annotates the input type of a PTransform with a type-hint.
 
@@ -633,9 +574,9 @@ class CallablePTransform(PTransform):
     return res
 
   def __call__(self, *args, **kwargs):
+    super(CallablePTransform, self).__init__()
     self._args = args
     self._kwargs = kwargs
-    super(CallablePTransform, self).__init__()
     return self
 
   def expand(self, pcoll):

http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py 
b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 9384e7b..4672709 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -250,8 +250,8 @@ class SideInputsTest(unittest.TestCase):
     side_list = pipeline | 'side list' >> beam.Create(a_list)
     results = main_input | beam.FlatMap(
         lambda x, s1, s2: [[x, s1, s2]],
-        beam.pvalue.AsSingleton('si1', side_list, default_value=2),
-        beam.pvalue.AsSingleton('si2', side_list, default_value=3))
+        beam.pvalue.AsSingleton(side_list, default_value=2, label='si1'),
+        beam.pvalue.AsSingleton(side_list, default_value=3, label='si2'))
 
     def  matcher(expected_elem, expected_singleton1, expected_singleton2):
       def match(actual):

http://git-wip-us.apache.org/repos/asf/beam/blob/44d6501d/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py 
b/sdks/python/apache_beam/transforms/util.py
index 1f691ca..56902dd 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -87,8 +87,8 @@ class CoGroupByKey(PTransform):
       to provide pipeline information, and should be considered mandatory.
   """
 
-  def __init__(self, label=None, **kwargs):
-    super(CoGroupByKey, self).__init__(label)
+  def __init__(self, **kwargs):
+    super(CoGroupByKey, self).__init__()
     self.pipeline = kwargs.pop('pipeline', None)
     if kwargs:
       raise ValueError('Unexpected keyword arguments: %s' % kwargs.keys())

Reply via email to