http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py 
b/sdks/python/apache_beam/transforms/combiners_test.py
index a747112..0439fe1 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -39,16 +39,16 @@ class CombineTest(unittest.TestCase):
     size = len(vals)
 
     # First for global combines.
-    pcoll = pipeline | Create('start', vals)
-    result_mean = pcoll | combine.Mean.Globally('mean')
-    result_count = pcoll | combine.Count.Globally('count')
+    pcoll = pipeline | 'start' >> Create(vals)
+    result_mean = pcoll | 'mean' >> combine.Mean.Globally()
+    result_count = pcoll | 'count' >> combine.Count.Globally()
     assert_that(result_mean, equal_to([mean]), label='assert:mean')
     assert_that(result_count, equal_to([size]), label='assert:size')
 
     # Again for per-key combines.
-    pcoll = pipeline | Create('start-perkey', [('a', x) for x in vals])
-    result_key_mean = pcoll | combine.Mean.PerKey('mean-perkey')
-    result_key_count = pcoll | combine.Count.PerKey('count-perkey')
+    pcoll = pipeline | 'start-perkey' >> Create([('a', x) for x in vals])
+    result_key_mean = pcoll | 'mean-perkey' >> combine.Mean.PerKey()
+    result_key_count = pcoll | 'count-perkey' >> combine.Count.PerKey()
     assert_that(result_key_mean, equal_to([('a', mean)]), label='key:mean')
     assert_that(result_key_count, equal_to([('a', size)]), label='key:size')
     pipeline.run()
@@ -66,9 +66,9 @@ class CombineTest(unittest.TestCase):
              9: 'nniiinne'}
 
     # First for global combines.
-    pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
-    result_top = pcoll | combine.Top.Largest('top', 5)
-    result_bot = pcoll | combine.Top.Smallest('bot', 4)
+    pcoll = pipeline | 'start' >> Create([6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
+    result_top = pcoll | 'top' >> combine.Top.Largest(5)
+    result_bot = pcoll | 'bot' >> combine.Top.Smallest(4)
     result_cmp = pcoll | combine.Top.Of(
         'cmp',
         6,
@@ -81,8 +81,8 @@ class CombineTest(unittest.TestCase):
     # Again for per-key combines.
     pcoll = pipeline | Create(
         'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
-    result_key_top = pcoll | combine.Top.LargestPerKey('top-perkey', 5)
-    result_key_bot = pcoll | combine.Top.SmallestPerKey('bot-perkey', 4)
+    result_key_top = pcoll | 'top-perkey' >> combine.Top.LargestPerKey(5)
+    result_key_bot = pcoll | 'bot-perkey' >> combine.Top.SmallestPerKey(4)
     result_key_cmp = pcoll | combine.Top.PerKey(
         'cmp-perkey',
         6,
@@ -99,15 +99,15 @@ class CombineTest(unittest.TestCase):
   def test_top_shorthands(self):
     pipeline = Pipeline('DirectPipelineRunner')
 
-    pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
-    result_top = pcoll | beam.CombineGlobally('top', combine.Largest(5))
-    result_bot = pcoll | beam.CombineGlobally('bot', combine.Smallest(4))
+    pcoll = pipeline | 'start' >> Create([6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
+    result_top = pcoll | 'top' >> beam.CombineGlobally(combine.Largest(5))
+    result_bot = pcoll | 'bot' >> beam.CombineGlobally(combine.Smallest(4))
     assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top')
     assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot')
 
     pcoll = pipeline | Create(
         'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
-    result_ktop = pcoll | beam.CombinePerKey('top-perkey', combine.Largest(5))
+    result_ktop = pcoll | 'top-perkey' >> 
beam.CombinePerKey(combine.Largest(5))
     result_kbot = pcoll | beam.CombinePerKey(
         'bot-perkey', combine.Smallest(4))
     assert_that(result_ktop, equal_to([('a', [9, 6, 6, 5, 3])]), label='k:top')
@@ -119,7 +119,7 @@ class CombineTest(unittest.TestCase):
     # First test global samples (lots of them).
     for ix in xrange(300):
       pipeline = Pipeline('DirectPipelineRunner')
-      pcoll = pipeline | Create('start', [1, 1, 2, 2])
+      pcoll = pipeline | 'start' >> Create([1, 1, 2, 2])
       result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3)
 
       def matcher():
@@ -141,7 +141,7 @@ class CombineTest(unittest.TestCase):
     pcoll = pipeline | Create(
         'start-perkey',
         sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))
-    result = pcoll | combine.Sample.FixedSizePerKey('sample', 3)
+    result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3)
 
     def matcher():
       def match(actual):
@@ -158,7 +158,7 @@ class CombineTest(unittest.TestCase):
     p = Pipeline('DirectPipelineRunner')
     result = (
         p
-        | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
+        | 'a' >> Create([(100, 0.0), ('b', 10, -1), ('c', 1, 100)])
         | beam.CombineGlobally(combine.TupleCombineFn(max,
                                                       combine.MeanCombineFn(),
                                                       sum)).without_defaults())
@@ -179,8 +179,8 @@ class CombineTest(unittest.TestCase):
   def test_to_list_and_to_dict(self):
     pipeline = Pipeline('DirectPipelineRunner')
     the_list = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
-    pcoll = pipeline | Create('start', the_list)
-    result = pcoll | combine.ToList('to list')
+    pcoll = pipeline | 'start' >> Create(the_list)
+    result = pcoll | 'to list' >> combine.ToList()
 
     def matcher(expected):
       def match(actual):
@@ -191,8 +191,8 @@ class CombineTest(unittest.TestCase):
 
     pipeline = Pipeline('DirectPipelineRunner')
     pairs = [(1, 2), (3, 4), (5, 6)]
-    pcoll = pipeline | Create('start-pairs', pairs)
-    result = pcoll | combine.ToDict('to dict')
+    pcoll = pipeline | 'start-pairs' >> Create(pairs)
+    result = pcoll | 'to dict' >> combine.ToDict()
 
     def matcher():
       def match(actual):
@@ -221,8 +221,8 @@ class CombineTest(unittest.TestCase):
         return main | Map(lambda _, s: s, side)
 
     p = Pipeline('DirectPipelineRunner')
-    result1 = p | Create('label1', []) | CombineWithSideInput('L1')
-    result2 = p | Create('label2', [1, 2, 3, 4]) | CombineWithSideInput('L2')
+    result1 = p | 'label1' >> Create([]) | 'L1' >> CombineWithSideInput()
+    result2 = p | 'label2' >> Create([1, 2, 3, 4]) | 'L2' >> 
CombineWithSideInput()
     assert_that(result1, equal_to([0]), label='r1')
     assert_that(result2, equal_to([10]), label='r2')
     p.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index b288321..44a6d29 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -814,12 +814,12 @@ class CombineGlobally(PTransform):
         return transform
 
     combined = (pcoll
-                | add_input_types(Map('KeyWithVoid', lambda v: (None, v))
+                | 'KeyWithVoid' >> add_input_types(Map(lambda v: (None, v))
                                   .with_output_types(
                                       KV[None, pcoll.element_type]))
                 | CombinePerKey(
                     'CombinePerKey', self.fn, *self.args, **self.kwargs)
-                | Map('UnKey', lambda (k, v): v))
+                | 'UnKey' >> Map(lambda (k, v): v))
 
     if not self.has_defaults and not self.as_view:
       return combined
@@ -851,8 +851,8 @@ class CombineGlobally(PTransform):
         else:
           return transform
       return (pcoll.pipeline
-              | Create('DoOnce', [None])
-              | typed(Map('InjectDefault', lambda _, s: s, view)))
+              | 'DoOnce' >> Create([None])
+              | 'InjectDefault' >> typed(Map(lambda _, s: s, view)))
 
 
 class CombinePerKey(PTransformWithSideInputs):
@@ -1045,9 +1045,9 @@ class GroupByKey(PTransform):
       gbk_output_type = KV[key_type, Iterable[value_type]]
 
       return (pcoll
-              | (ParDo('reify_windows', self.ReifyWindows())
+              | 'reify_windows' >> (ParDo(self.ReifyWindows())
                  .with_output_types(reify_output_type))
-              | (GroupByKeyOnly('group_by_key')
+              | 'group_by_key' >> (GroupByKeyOnly()
                  .with_input_types(reify_output_type)
                  .with_output_types(gbk_input_type))
               | (ParDo('group_by_window',
@@ -1056,8 +1056,8 @@ class GroupByKey(PTransform):
                  .with_output_types(gbk_output_type)))
     else:
       return (pcoll
-              | ParDo('reify_windows', self.ReifyWindows())
-              | GroupByKeyOnly('group_by_key')
+              | 'reify_windows' >> ParDo(self.ReifyWindows())
+              | 'group_by_key' >> GroupByKeyOnly()
               | ParDo('group_by_window',
                       self.GroupAlsoByWindow(pcoll.windowing)))
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py 
b/sdks/python/apache_beam/transforms/ptransform.py
index b652bca..6eb28b0 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -691,7 +691,7 @@ def ptransform_fn(fn):
   With either method the custom PTransform can be used in pipelines as if
   it were one of the "native" PTransforms::
 
-    result_pcoll = input_pcoll | CustomMapper('label', somefn)
+    result_pcoll = input_pcoll | 'label' >> CustomMapper(somefn)
 
   Note that for both solutions the underlying implementation of the pipe
   operator (i.e., `|`) will inject the pcoll argument in its proper place

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py 
b/sdks/python/apache_beam/transforms/ptransform_test.py
index feb081c..8121c1e 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -53,12 +53,12 @@ class PTransformTest(unittest.TestCase):
                      str(PTransform()))
 
     pa = Pipeline('DirectPipelineRunner')
-    res = pa | beam.Create('a_label', [1, 2])
+    res = pa | 'a_label' >> beam.Create([1, 2])
     self.assertEqual('<Create(PTransform) label=[a_label]>',
                      str(res.producer.transform))
 
     pc = Pipeline('DirectPipelineRunner')
-    res = pc | beam.Create('with_inputs', [1, 2])
+    res = pc | 'with_inputs' >> beam.Create([1, 2])
     inputs_tr = res.producer.transform
     inputs_tr.inputs = ('ci',)
     self.assertEqual(
@@ -66,7 +66,7 @@ class PTransformTest(unittest.TestCase):
         str(inputs_tr))
 
     pd = Pipeline('DirectPipelineRunner')
-    res = pd | beam.Create('with_sidei', [1, 2])
+    res = pd | 'with_sidei' >> beam.Create([1, 2])
     side_tr = res.producer.transform
     side_tr.side_inputs = (4,)
     self.assertEqual(
@@ -110,8 +110,8 @@ class PTransformTest(unittest.TestCase):
         return [context.element + addon]
 
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [1, 2, 3])
-    result = pcoll | beam.ParDo('do', AddNDoFn(), 10)
+    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
+    result = pcoll | 'do' >> beam.ParDo(AddNDoFn(), 10)
     assert_that(result, equal_to([11, 12, 13]))
     pipeline.run()
 
@@ -122,21 +122,21 @@ class PTransformTest(unittest.TestCase):
         pass
 
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [1, 2, 3])
+    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     with self.assertRaises(ValueError):
-      pcoll | beam.ParDo('do', MyDoFn)  # Note the lack of ()'s
+      pcoll | 'do' >> beam.ParDo(MyDoFn)  # Note the lack of ()'s
 
   def test_do_with_callable(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [1, 2, 3])
-    result = pcoll | beam.FlatMap('do', lambda x, addon: [x + addon], 10)
+    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
+    result = pcoll | 'do' >> beam.FlatMap(lambda x, addon: [x + addon], 10)
     assert_that(result, equal_to([11, 12, 13]))
     pipeline.run()
 
   def test_do_with_side_input_as_arg(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    side = pipeline | beam.Create('side', [10])
-    pcoll = pipeline | beam.Create('start', [1, 2, 3])
+    side = pipeline | 'side' >> beam.Create([10])
+    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | beam.FlatMap(
         'do', lambda x, addon: [x + addon], pvalue.AsSingleton(side))
     assert_that(result, equal_to([11, 12, 13]))
@@ -144,8 +144,8 @@ class PTransformTest(unittest.TestCase):
 
   def test_do_with_side_input_as_keyword_arg(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    side = pipeline | beam.Create('side', [10])
-    pcoll = pipeline | beam.Create('start', [1, 2, 3])
+    side = pipeline | 'side' >> beam.Create([10])
+    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
     result = pcoll | beam.FlatMap(
         'do', lambda x, addon: [x + addon], addon=pvalue.AsSingleton(side))
     assert_that(result, equal_to([11, 12, 13]))
@@ -153,8 +153,8 @@ class PTransformTest(unittest.TestCase):
 
   def test_do_with_do_fn_returning_string_raises_warning(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', ['2', '9', '3'])
-    pcoll | beam.FlatMap('do', lambda x: x + '1')
+    pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
+    pcoll | 'do' >> beam.FlatMap(lambda x: x + '1')
 
     # Since the DoFn directly returns a string we should get an error warning
     # us.
@@ -167,8 +167,8 @@ class PTransformTest(unittest.TestCase):
 
   def test_do_with_do_fn_returning_dict_raises_warning(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', ['2', '9', '3'])
-    pcoll | beam.FlatMap('do', lambda x: {x: '1'})
+    pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3'])
+    pcoll | 'do' >> beam.FlatMap(lambda x: {x: '1'})
 
     # Since the DoFn directly returns a dict we should get an error warning
     # us.
@@ -181,9 +181,9 @@ class PTransformTest(unittest.TestCase):
 
   def test_do_with_side_outputs_maintains_unique_name(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [1, 2, 3])
-    r1 = pcoll | beam.FlatMap('a', lambda x: [x + 1]).with_outputs(main='m')
-    r2 = pcoll | beam.FlatMap('b', lambda x: [x + 2]).with_outputs(main='m')
+    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
+    r1 = pcoll | 'a' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m')
+    r2 = pcoll | 'b' >> beam.FlatMap(lambda x: [x + 2]).with_outputs(main='m')
     assert_that(r1.m, equal_to([2, 3, 4]), label='r1')
     assert_that(r2.m, equal_to([3, 4, 5]), label='r2')
     pipeline.run()
@@ -194,8 +194,8 @@ class PTransformTest(unittest.TestCase):
     def incorrect_par_do_fn(x):
       return x + 5
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [2, 9, 3])
-    pcoll | beam.FlatMap('do', incorrect_par_do_fn)
+    pcoll = pipeline | 'start' >> beam.Create([2, 9, 3])
+    pcoll | 'do' >> beam.FlatMap(incorrect_par_do_fn)
     # It's a requirement that all user-defined functions to a ParDo return
     # an iterable.
     with self.assertRaises(typehints.TypeCheckError) as cm:
@@ -215,8 +215,8 @@ class PTransformTest(unittest.TestCase):
       def finish_bundle(self, c):
         yield 'finish'
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [1, 2, 3])
-    result = pcoll | beam.ParDo('do', MyDoFn())
+    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3])
+    result = pcoll | 'do' >> beam.ParDo(MyDoFn())
 
     # May have many bundles, but each has a start and finish.
     def  matcher():
@@ -230,7 +230,7 @@ class PTransformTest(unittest.TestCase):
 
   def test_filter(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [1, 2, 3, 4])
+    pcoll = pipeline | 'start' >> beam.Create([1, 2, 3, 4])
     result = pcoll | beam.Filter(
         'filter', lambda x: x % 2 == 0)
     assert_that(result, equal_to([2, 4]))
@@ -256,15 +256,15 @@ class PTransformTest(unittest.TestCase):
   def test_combine_with_combine_fn(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', vals)
-    result = pcoll | beam.CombineGlobally('mean', self._MeanCombineFn())
+    pcoll = pipeline | 'start' >> beam.Create(vals)
+    result = pcoll | 'mean' >> beam.CombineGlobally(self._MeanCombineFn())
     assert_that(result, equal_to([sum(vals) / len(vals)]))
     pipeline.run()
 
   def test_combine_with_callable(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', vals)
+    pcoll = pipeline | 'start' >> beam.Create(vals)
     result = pcoll | beam.CombineGlobally(sum)
     assert_that(result, equal_to([sum(vals)]))
     pipeline.run()
@@ -272,8 +272,8 @@ class PTransformTest(unittest.TestCase):
   def test_combine_with_side_input_as_arg(self):
     values = [1, 2, 3, 4, 5, 6, 7]
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', values)
-    divisor = pipeline | beam.Create('divisor', [2])
+    pcoll = pipeline | 'start' >> beam.Create(values)
+    divisor = pipeline | 'divisor' >> beam.Create([2])
     result = pcoll | beam.CombineGlobally(
         'max',
         # Multiples of divisor only.
@@ -287,9 +287,9 @@ class PTransformTest(unittest.TestCase):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', ([('a', x) for x in vals_1] +
+    pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
                                              [('b', x) for x in vals_2]))
-    result = pcoll | beam.CombinePerKey('mean', self._MeanCombineFn())
+    result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn())
     assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)),
                                   ('b', sum(vals_2) / len(vals_2))]))
     pipeline.run()
@@ -298,7 +298,7 @@ class PTransformTest(unittest.TestCase):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', ([('a', x) for x in vals_1] +
+    pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
                                              [('b', x) for x in vals_2]))
     result = pcoll | beam.CombinePerKey(sum)
     assert_that(result, equal_to([('a', sum(vals_1)), ('b', sum(vals_2))]))
@@ -308,9 +308,9 @@ class PTransformTest(unittest.TestCase):
     vals_1 = [1, 2, 3, 4, 5, 6, 7]
     vals_2 = [2, 4, 6, 8, 10, 12, 14]
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', ([('a', x) for x in vals_1] +
+    pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] +
                                              [('b', x) for x in vals_2]))
-    divisor = pipeline | beam.Create('divisor', [2])
+    divisor = pipeline | 'divisor' >> beam.Create([2])
     result = pcoll | beam.CombinePerKey(
         lambda vals, d: max(v for v in vals if v % d == 0),
         pvalue.AsSingleton(divisor))  # Multiples of divisor only.
@@ -323,7 +323,7 @@ class PTransformTest(unittest.TestCase):
     pipeline = Pipeline('DirectPipelineRunner')
     pcoll = pipeline | beam.Create(
         'start', [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)])
-    result = pcoll | beam.GroupByKey('group')
+    result = pcoll | 'group' >> beam.GroupByKey()
     assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
     pipeline.run()
 
@@ -335,9 +335,9 @@ class PTransformTest(unittest.TestCase):
         return (context.element % 3) + offset
 
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
+    pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
     # Attempt nominal partition operation.
-    partitions = pcoll | beam.Partition('part1', SomePartitionFn(), 4, 1)
+    partitions = pcoll | 'part1' >> beam.Partition(SomePartitionFn(), 4, 1)
     assert_that(partitions[0], equal_to([]))
     assert_that(partitions[1], equal_to([0, 3, 6]), label='p1')
     assert_that(partitions[2], equal_to([1, 4, 7]), label='p2')
@@ -347,14 +347,14 @@ class PTransformTest(unittest.TestCase):
     # Check that a bad partition label will yield an error. For the
     # DirectPipelineRunner, this error manifests as an exception.
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
-    partitions = pcoll | beam.Partition('part2', SomePartitionFn(), 4, 10000)
+    pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
+    partitions = pcoll | 'part2' >> beam.Partition(SomePartitionFn(), 4, 10000)
     with self.assertRaises(ValueError):
       pipeline.run()
 
   def test_partition_with_callable(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8])
+    pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8])
     partitions = (
         pcoll | beam.Partition(
             'part',
@@ -370,49 +370,49 @@ class PTransformTest(unittest.TestCase):
     """Regression test for an issue with how partitions are handled."""
     pipeline = Pipeline('DirectPipelineRunner')
     contents = [('aa', 1), ('bb', 2), ('aa', 2)]
-    created = pipeline | beam.Create('A', contents)
-    partitioned = created | beam.Partition('B', lambda x, n: len(x) % n, 3)
-    flattened = partitioned | beam.Flatten('C')
-    grouped = flattened | beam.GroupByKey('D')
+    created = pipeline | 'A' >> beam.Create(contents)
+    partitioned = created | 'B' >> beam.Partition(lambda x, n: len(x) % n, 3)
+    flattened = partitioned | 'C' >> beam.Flatten()
+    grouped = flattened | 'D' >> beam.GroupByKey()
     assert_that(grouped, equal_to([('aa', [1, 2]), ('bb', [2])]))
     pipeline.run()
 
   def test_flatten_pcollections(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll_1 = pipeline | beam.Create('start_1', [0, 1, 2, 3])
-    pcoll_2 = pipeline | beam.Create('start_2', [4, 5, 6, 7])
-    result = (pcoll_1, pcoll_2) | beam.Flatten('flatten')
+    pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
+    pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
+    result = (pcoll_1, pcoll_2) | 'flatten' >> beam.Flatten()
     assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7]))
     pipeline.run()
 
   def test_flatten_no_pcollections(self):
     pipeline = Pipeline('DirectPipelineRunner')
     with self.assertRaises(ValueError):
-      () | beam.Flatten('pipeline arg missing')
-    result = () | beam.Flatten('empty', pipeline=pipeline)
+      () | 'pipeline arg missing' >> beam.Flatten()
+    result = () | 'empty' >> beam.Flatten(pipeline=pipeline)
     assert_that(result, equal_to([]))
     pipeline.run()
 
   def test_flatten_pcollections_in_iterable(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll_1 = pipeline | beam.Create('start_1', [0, 1, 2, 3])
-    pcoll_2 = pipeline | beam.Create('start_2', [4, 5, 6, 7])
+    pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3])
+    pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7])
     result = ([pcoll for pcoll in (pcoll_1, pcoll_2)]
-              | beam.Flatten('flatten'))
+              | 'flatten' >> beam.Flatten())
     assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7]))
     pipeline.run()
 
   def test_flatten_input_type_must_be_iterable(self):
     # Inputs to flatten *must* be an iterable.
     with self.assertRaises(ValueError):
-      4 | beam.Flatten('flatten')
+      4 | 'flatten' >> beam.Flatten()
 
   def test_flatten_input_type_must_be_iterable_of_pcolls(self):
     # Inputs to flatten *must* be an iterable of PCollections.
     with self.assertRaises(TypeError):
-      {'l': 'test'} | beam.Flatten('flatten')
+      {'l': 'test'} | 'flatten' >> beam.Flatten()
     with self.assertRaises(TypeError):
-      set([1, 2, 3]) | beam.Flatten('flatten')
+      set([1, 2, 3]) | 'flatten' >> beam.Flatten()
 
   def test_co_group_by_key_on_list(self):
     pipeline = Pipeline('DirectPipelineRunner')
@@ -420,7 +420,7 @@ class PTransformTest(unittest.TestCase):
         'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
     pcoll_2 = pipeline | beam.Create(
         'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
-    result = (pcoll_1, pcoll_2) | beam.CoGroupByKey('cgbk')
+    result = (pcoll_1, pcoll_2) | 'cgbk' >> beam.CoGroupByKey()
     assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
                                   ('b', ([3], [])),
                                   ('c', ([4], [7, 8]))]))
@@ -433,7 +433,7 @@ class PTransformTest(unittest.TestCase):
     pcoll_2 = pipeline | beam.Create(
         'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
     result = ([pc for pc in (pcoll_1, pcoll_2)]
-              | beam.CoGroupByKey('cgbk'))
+              | 'cgbk' >> beam.CoGroupByKey())
     assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
                                   ('b', ([3], [])),
                                   ('c', ([4], [7, 8]))]))
@@ -445,7 +445,7 @@ class PTransformTest(unittest.TestCase):
         'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
     pcoll_2 = pipeline | beam.Create(
         'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
-    result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey('cgbk')
+    result = {'X': pcoll_1, 'Y': pcoll_2} | 'cgbk' >> beam.CoGroupByKey()
     assert_that(result, equal_to([('a', {'X': [1, 2], 'Y': [5, 6]}),
                                   ('b', {'X': [3], 'Y': []}),
                                   ('c', {'X': [4], 'Y': [7, 8]})]))
@@ -453,10 +453,10 @@ class PTransformTest(unittest.TestCase):
 
   def test_group_by_key_input_must_be_kv_pairs(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcolls = pipeline | beam.Create('A', [1, 2, 3, 4, 5])
+    pcolls = pipeline | 'A' >> beam.Create([1, 2, 3, 4, 5])
 
     with self.assertRaises(typehints.TypeCheckError) as e:
-      pcolls | beam.GroupByKey('D')
+      pcolls | 'D' >> beam.GroupByKey()
       pipeline.run()
 
     self.assertStartswith(
@@ -466,9 +466,9 @@ class PTransformTest(unittest.TestCase):
 
   def test_group_by_key_only_input_must_be_kv_pairs(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcolls = pipeline | beam.Create('A', ['a', 'b', 'f'])
+    pcolls = pipeline | 'A' >> beam.Create(['a', 'b', 'f'])
     with self.assertRaises(typehints.TypeCheckError) as cm:
-      pcolls | beam.GroupByKeyOnly('D')
+      pcolls | 'D' >> beam.GroupByKeyOnly()
       pipeline.run()
 
     expected_error_prefix = ('Input type hint violation at D: expected '
@@ -506,20 +506,20 @@ class PTransformTest(unittest.TestCase):
     t = (beam.Map(lambda x: (x, 1))
          | beam.GroupByKey()
          | beam.Map(lambda (x, ones): (x, sum(ones))))
-    result = pipeline | beam.Create('start', ['a', 'a', 'b']) | t
+    result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t
     assert_that(result, equal_to([('a', 2), ('b', 1)]))
     pipeline.run()
 
   def test_apply_to_list(self):
     self.assertItemsEqual(
-        [1, 2, 3], [0, 1, 2] | beam.Map('add_one', lambda x: x + 1))
-    self.assertItemsEqual([1], [0, 1, 2] | beam.Filter('odd', lambda x: x % 2))
+        [1, 2, 3], [0, 1, 2] | 'add_one' >> beam.Map(lambda x: x + 1))
+    self.assertItemsEqual([1], [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 
2))
     self.assertItemsEqual([1, 2, 100, 3],
-                          ([1, 2, 3], [100]) | beam.Flatten('flat'))
+                          ([1, 2, 3], [100]) | 'flat' >> beam.Flatten())
     join_input = ([('k', 'a')],
                   [('k', 'b'), ('k', 'c')])
     self.assertItemsEqual([('k', (['a'], ['b', 'c']))],
-                          join_input | beam.CoGroupByKey('join'))
+                          join_input | 'join' >> beam.CoGroupByKey())
 
   def test_multi_input_ptransform(self):
     class DisjointUnion(PTransform):
@@ -583,7 +583,7 @@ class PTransformLabelsTest(unittest.TestCase):
     gbk = beam.GroupByKey('gbk')
     map2 = beam.Map('map2', lambda (x, ones): (x, sum(ones)))
     t = (map1 | gbk | map2)
-    result = pipeline | beam.Create('start', ['a', 'a', 'b']) | t
+    result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t
     self.assertTrue('map1|gbk|map2/map1' in pipeline.applied_labels)
     self.assertTrue('map1|gbk|map2/gbk' in pipeline.applied_labels)
     self.assertTrue('map1|gbk|map2/map2' in pipeline.applied_labels)
@@ -592,7 +592,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def test_apply_custom_transform_without_label(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('pcoll', [1, 2, 3])
+    pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
     custom = PTransformLabelsTest.CustomTransform()
     result = pipeline.apply(custom, pcoll)
     self.assertTrue('CustomTransform' in pipeline.applied_labels)
@@ -602,7 +602,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def test_apply_custom_transform_with_label(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('pcoll', [1, 2, 3])
+    pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
     custom = PTransformLabelsTest.CustomTransform('*custom*')
     result = pipeline.apply(custom, pcoll)
     self.assertTrue('*custom*' in pipeline.applied_labels)
@@ -613,7 +613,7 @@ class PTransformLabelsTest(unittest.TestCase):
   def test_combine_without_label(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', vals)
+    pcoll = pipeline | 'start' >> beam.Create(vals)
     combine = beam.CombineGlobally(sum)
     result = pcoll | combine
     self.assertTrue('CombineGlobally(sum)' in pipeline.applied_labels)
@@ -622,7 +622,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def test_apply_ptransform_using_decorator(self):
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('pcoll', [1, 2, 3])
+    pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3])
     sample = SamplePTransform('*sample*')
     _ = pcoll | sample
     self.assertTrue('*sample*' in pipeline.applied_labels)
@@ -633,7 +633,7 @@ class PTransformLabelsTest(unittest.TestCase):
   def test_combine_with_label(self):
     vals = [1, 2, 3, 4, 5, 6, 7]
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | beam.Create('start', vals)
+    pcoll = pipeline | 'start' >> beam.Create(vals)
     combine = beam.CombineGlobally('*sum*', sum)
     result = pcoll | combine
     self.assertTrue('*sum*' in pipeline.applied_labels)
@@ -642,7 +642,7 @@ class PTransformLabelsTest(unittest.TestCase):
 
   def check_label(self, ptransform, expected_label):
     pipeline = Pipeline('DirectPipelineRunner')
-    pipeline | beam.Create('start', [('a', 1)]) | ptransform
+    pipeline | 'start' >> beam.Create([('a', 1)]) | ptransform
     actual_label = sorted(pipeline.applied_labels - {'start'})[0]
     self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label))
 
@@ -679,8 +679,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         return [context.element + five]
 
     d = (self.p
-         | beam.Create('t', [1, 2, 3]).with_output_types(int)
-         | beam.ParDo('add', AddWithFive(), 5))
+         | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+         | 'add' >> beam.ParDo(AddWithFive(), 5))
 
     assert_that(d, equal_to([6, 7, 8]))
     self.p.run()
@@ -694,8 +694,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('t', [1, 2, 3]).with_output_types(int)
-       | beam.ParDo('upper', ToUpperCaseWithPrefix(), 'hello'))
+       | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+       | 'upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello'))
 
     self.assertEqual("Type hint violation for 'upper': "
                      "requires <type 'str'> but got <type 'int'> for context",
@@ -711,8 +711,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         return [context.element + num]
 
     d = (self.p
-         | beam.Create('t', [1, 2, 3]).with_output_types(int)
-         | beam.ParDo('add', AddWithNum(), 5))
+         | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+         | 'add' >> beam.ParDo(AddWithNum(), 5))
 
     assert_that(d, equal_to([6, 7, 8]))
     self.p.run()
@@ -728,8 +728,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('t', ['1', '2', '3']).with_output_types(str)
-       | beam.ParDo('add', AddWithNum(), 5))
+       | 't' >> beam.Create(['1', '2', '3']).with_output_types(str)
+       | 'add' >> beam.ParDo(AddWithNum(), 5))
       self.p.run()
 
     self.assertEqual("Type hint violation for 'add': "
@@ -746,8 +746,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # will receive a str instead, which should result in a raised exception.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('s', ['b', 'a', 'r']).with_output_types(str)
-       | beam.FlatMap('to str', int_to_str))
+       | 's' >> beam.Create(['b', 'a', 'r']).with_output_types(str)
+       | 'to str' >> beam.FlatMap(int_to_str))
 
     self.assertEqual("Type hint violation for 'to str': "
                      "requires <type 'int'> but got <type 'str'> for a",
@@ -761,8 +761,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     # If this type-checks than no error should be raised.
     d = (self.p
-         | beam.Create('t', ['t', 'e', 's', 't']).with_output_types(str)
-         | beam.FlatMap('case', to_all_upper_case))
+         | 't' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+         | 'case' >> beam.FlatMap(to_all_upper_case))
     assert_that(d, equal_to(['T', 'E', 'S', 'T']))
     self.p.run()
 
@@ -775,10 +775,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # expecting pcoll's of type str instead.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
-       | (beam.FlatMap('score', lambda x: [1] if x == 't' else [2])
+       | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+       | 'score' >> (beam.FlatMap(lambda x: [1] if x == 't' else [2])
           .with_input_types(str).with_output_types(int))
-       | (beam.FlatMap('upper', lambda x: [x.upper()])
+       | 'upper' >> (beam.FlatMap(lambda x: [x.upper()])
           .with_input_types(str).with_output_types(str)))
 
     self.assertEqual("Type hint violation for 'upper': "
@@ -788,10 +788,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_pardo_properly_type_checks_using_type_hint_methods(self):
     # Pipeline should be created successfully without an error
     d = (self.p
-         | beam.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
-         | beam.FlatMap('dup', lambda x: [x + x])
+         | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+         | 'dup' >> beam.FlatMap(lambda x: [x + x])
          .with_input_types(str).with_output_types(str)
-         | beam.FlatMap('upper', lambda x: [x.upper()])
+         | 'upper' >> beam.FlatMap(lambda x: [x.upper()])
          .with_input_types(str).with_output_types(str))
 
     assert_that(d, equal_to(['TT', 'EE', 'SS', 'TT']))
@@ -802,8 +802,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # int's, while Map is expecting one of str.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('s', [1, 2, 3, 4]).with_output_types(int)
-       | beam.Map('upper', lambda x: x.upper())
+       | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+       | 'upper' >> beam.Map(lambda x: x.upper())
        .with_input_types(str).with_output_types(str))
 
     self.assertEqual("Type hint violation for 'upper': "
@@ -813,8 +813,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_map_properly_type_checks_using_type_hints_methods(self):
     # No error should be raised if this type-checks properly.
     d = (self.p
-         | beam.Create('s', [1, 2, 3, 4]).with_output_types(int)
-         | beam.Map('to_str', lambda x: str(x))
+         | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+         | 'to_str' >> beam.Map(lambda x: str(x))
          .with_input_types(int).with_output_types(str))
     assert_that(d, equal_to(['1', '2', '3', '4']))
     self.p.run()
@@ -829,8 +829,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # However, 'Map' should detect that Create has hinted an int instead.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('s', [1, 2, 3, 4]).with_output_types(int)
-       | beam.Map('upper', upper))
+       | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+       | 'upper' >> beam.Map(upper))
 
     self.assertEqual("Type hint violation for 'upper': "
                      "requires <type 'str'> but got <type 'int'> for s",
@@ -844,8 +844,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     # If this type-checks than no error should be raised.
     d = (self.p
-         | beam.Create('bools', [True, False, True]).with_output_types(bool)
-         | beam.Map('to_ints', bool_to_int))
+         | 'bools' >> beam.Create([True, False, True]).with_output_types(bool)
+         | 'to_ints' >> beam.Map(bool_to_int))
     assert_that(d, equal_to([1, 0, 1]))
     self.p.run()
 
@@ -854,10 +854,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # incoming.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('strs', ['1', '2', '3', '4', '5']).with_output_types(str)
-       | beam.Map('lower', lambda x: x.lower())
+       | 'strs' >> beam.Create(['1', '2', '3', '4', 
'5']).with_output_types(str)
+       | 'lower' >> beam.Map(lambda x: x.lower())
        .with_input_types(str).with_output_types(str)
-       | beam.Filter('below 3', lambda x: x < 3).with_input_types(int))
+       | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
 
     self.assertEqual("Type hint violation for 'below 3': "
                      "requires <type 'int'> but got <type 'str'> for x",
@@ -866,10 +866,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_filter_type_checks_using_type_hints_method(self):
     # No error should be raised if this type-checks properly.
     d = (self.p
-         | beam.Create('strs', ['1', '2', '3', '4', 
'5']).with_output_types(str)
-         | beam.Map('to int', lambda x: int(x))
+         | 'strs' >> beam.Create(['1', '2', '3', '4', 
'5']).with_output_types(str)
+         | 'to int' >> beam.Map(lambda x: int(x))
          .with_input_types(str).with_output_types(int)
-         | beam.Filter('below 3', lambda x: x < 3).with_input_types(int))
+         | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int))
     assert_that(d, equal_to([1, 2]))
     self.p.run()
 
@@ -881,8 +881,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # Func above was hinted to only take a float, yet an int will be passed.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('ints', [1, 2, 3, 4]).with_output_types(int)
-       | beam.Filter('half', more_than_half))
+       | 'ints' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+       | 'half' >> beam.Filter(more_than_half))
 
     self.assertEqual("Type hint violation for 'half': "
                      "requires <type 'float'> but got <type 'int'> for a",
@@ -896,17 +896,17 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     # Filter should deduce that it returns the same type that it takes.
     (self.p
-     | beam.Create('str', range(5)).with_output_types(int)
-     | beam.Filter('half', half)
-     | beam.Map('to bool', lambda x: bool(x))
+     | 'str' >> beam.Create(range(5)).with_output_types(int)
+     | 'half' >> beam.Filter(half)
+     | 'to bool' >> beam.Map(lambda x: bool(x))
      .with_input_types(int).with_output_types(bool))
 
   def test_group_by_key_only_output_type_deduction(self):
     d = (self.p
-         | beam.Create('str', ['t', 'e', 's', 't']).with_output_types(str)
-         | (beam.Map('pair', lambda x: (x, ord(x)))
+         | 'str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+         | 'pair' >> (beam.Map(lambda x: (x, ord(x)))
             .with_output_types(typehints.KV[str, str]))
-         | beam.GroupByKeyOnly('O'))
+         | 'O' >> beam.GroupByKeyOnly())
 
     # Output type should correctly be deduced.
     # GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -915,10 +915,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_group_by_key_output_type_deduction(self):
     d = (self.p
-         | beam.Create('str', range(20)).with_output_types(int)
-         | (beam.Map('pair negative', lambda x: (x % 5, -x))
+         | 'str' >> beam.Create(range(20)).with_output_types(int)
+         | 'pair negative' >> (beam.Map(lambda x: (x % 5, -x))
             .with_output_types(typehints.KV[int, int]))
-         | beam.GroupByKey('T'))
+         | 'T' >> beam.GroupByKey())
 
     # Output type should correctly be deduced.
     # GBK should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
@@ -929,8 +929,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # GBK will be passed raw int's here instead of some form of KV[A, B].
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('s', [1, 2, 3]).with_output_types(int)
-       | beam.GroupByKeyOnly('F'))
+       | 's' >> beam.Create([1, 2, 3]).with_output_types(int)
+       | 'F' >> beam.GroupByKeyOnly())
 
     self.assertEqual("Input type hint violation at F: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -942,9 +942,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # aliased to Tuple[int, str].
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | (beam.Create('s', range(5))
+       | 's' >> (beam.Create(range(5))
           .with_output_types(typehints.Iterable[int]))
-       | beam.GroupByKey('T'))
+       | 'T' >> beam.GroupByKey())
 
     self.assertEqual("Input type hint violation at T: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -958,8 +958,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # information to the ParDo.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('nums', range(5))
-       | beam.FlatMap('mod dup', lambda x: (x % 2, x)))
+       | 'nums' >> beam.Create(range(5))
+       | 'mod dup' >> beam.FlatMap(lambda x: (x % 2, x)))
 
     self.assertEqual('Pipeline type checking is enabled, however no output '
                      'type-hint was found for the PTransform Create(nums)',
@@ -971,9 +971,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # information to GBK-only.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('nums', range(5)).with_output_types(int)
-       | beam.Map('mod dup', lambda x: (x % 2, x))
-       | beam.GroupByKeyOnly('G'))
+       | 'nums' >> beam.Create(range(5)).with_output_types(int)
+       | 'mod dup' >> beam.Map(lambda x: (x % 2, x))
+       | 'G' >> beam.GroupByKeyOnly())
 
     self.assertEqual('Pipeline type checking is enabled, however no output '
                      'type-hint was found for the PTransform '
@@ -986,8 +986,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # The pipeline below should raise a TypeError, however pipeline type
     # checking was disabled above.
     (self.p
-     | beam.Create('t', [1, 2, 3]).with_output_types(int)
-     | beam.Map('lower', lambda x: x.lower())
+     | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+     | 'lower' >> beam.Map(lambda x: x.lower())
      .with_input_types(str).with_output_types(str))
 
   def test_run_time_type_checking_enabled_type_violation(self):
@@ -1002,8 +1002,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # Function above has been type-hinted to only accept an int. But in the
     # pipeline execution it'll be passed a string due to the output of Create.
     (self.p
-     | beam.Create('t', ['some_string'])
-     | beam.Map('to str', int_to_string))
+     | 't' >> beam.Create(['some_string'])
+     | 'to str' >> beam.Map(int_to_string))
     with self.assertRaises(typehints.TypeCheckError) as e:
       self.p.run()
 
@@ -1026,10 +1026,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # Pipeline checking is off, but the above function should satisfy types at
     # run-time.
     result = (self.p
-              | beam.Create('t', ['t', 'e', 's', 't', 'i', 'n', 'g'])
+              | 't' >> beam.Create(['t', 'e', 's', 't', 'i', 'n', 'g'])
               .with_output_types(str)
-              | beam.Map('gen keys', group_with_upper_ord)
-              | beam.GroupByKey('O'))
+              | 'gen keys' >> beam.Map(group_with_upper_ord)
+              | 'O' >> beam.GroupByKey())
 
     assert_that(result, equal_to([(1, ['g']),
                                   (3, ['s', 'i', 'n']),
@@ -1048,9 +1048,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return (a % 2, a)
 
     (self.p
-     | beam.Create('nums', range(5)).with_output_types(int)
-     | beam.Map('is even', is_even_as_key)
-     | beam.GroupByKey('parity'))
+     | 'nums' >> beam.Create(range(5)).with_output_types(int)
+     | 'is even' >> beam.Map(is_even_as_key)
+     | 'parity' >> beam.GroupByKey())
 
     # Although all the types appear to be correct when checked at pipeline
     # construction. Runtime type-checking should detect the 'is_even_as_key' is
@@ -1077,9 +1077,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return (a % 2 == 0, a)
 
     result = (self.p
-              | beam.Create('nums', range(5)).with_output_types(int)
-              | beam.Map('is even', is_even_as_key)
-              | beam.GroupByKey('parity'))
+              | 'nums' >> beam.Create(range(5)).with_output_types(int)
+              | 'is even' >> beam.Map(is_even_as_key)
+              | 'parity' >> beam.GroupByKey())
 
     assert_that(result, equal_to([(False, [1, 3]), (True, [0, 2, 4])]))
     self.p.run()
@@ -1092,8 +1092,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # ParDo should receive an instance of type 'str', however an 'int' will be
     # passed instead.
     with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p | beam.Create('n', [1, 2, 3])
-       | (beam.FlatMap('to int', lambda x: [int(x)])
+      (self.p | 'n' >> beam.Create([1, 2, 3])
+       | 'to int' >> (beam.FlatMap(lambda x: [int(x)])
           .with_input_types(str).with_output_types(int))
       )
       self.p.run()
@@ -1111,8 +1111,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('n', [(1, 3.0), (2, 4.9), (3, 9.5)])
-       | (beam.FlatMap('add', lambda (x, y): [x + y])
+       | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
+       | 'add' >> (beam.FlatMap(lambda (x, y): [x + y])
           .with_input_types(typehints.Tuple[int, int]).with_output_types(int))
       )
       self.p.run()
@@ -1136,8 +1136,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         lambda x: [float(x)]).with_input_types(int).with_output_types(
             int).get_type_hints()
     with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p | beam.Create('n', [1, 2, 3])
-       | (beam.FlatMap('to int', lambda x: [float(x)])
+      (self.p | 'n' >> beam.Create([1, 2, 3])
+       | 'to int' >> (beam.FlatMap(lambda x: [float(x)])
           .with_input_types(int).with_output_types(int))
       )
       self.p.run()
@@ -1159,8 +1159,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     # of 'int' will be generated instead.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('n', [(1, 3.0), (2, 4.9), (3, 9.5)])
-       | (beam.FlatMap('swap', lambda (x, y): [x + y])
+       | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)])
+       | 'swap' >> (beam.FlatMap(lambda (x, y): [x + y])
           .with_input_types(typehints.Tuple[int, float])
           .with_output_types(typehints.Tuple[float, int]))
       )
@@ -1183,7 +1183,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return a + b
 
     with self.assertRaises(typehints.TypeCheckError) as e:
-      (self.p | beam.Create('t', [1, 2, 3, 4]) | beam.Map('add 1', add, 1.0))
+      (self.p | 't' >> beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 
1.0))
       self.p.run()
 
     self.assertStartswith(
@@ -1199,8 +1199,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('t', [1, 2, 3, 4])
-       | (beam.Map('add 1', lambda x, one: x + one, 1.0)
+       | 't' >> beam.Create([1, 2, 3, 4])
+       | 'add 1' >> (beam.Map(lambda x, one: x + one, 1.0)
           .with_input_types(int, int)
           .with_output_types(float)))
       self.p.run()
@@ -1219,8 +1219,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return sum(ints)
 
     d = (self.p
-         | beam.Create('t', [1, 2, 3]).with_output_types(int)
-         | beam.CombineGlobally('sum', sum_ints))
+         | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+         | 'sum' >> beam.CombineGlobally(sum_ints))
 
     self.assertEqual(int, d.element_type)
     assert_that(d, equal_to([6]))
@@ -1234,8 +1234,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('m', [1, 2, 3]).with_output_types(int)
-       | beam.CombineGlobally('add', bad_combine))
+       | 'm' >> beam.Create([1, 2, 3]).with_output_types(int)
+       | 'add' >> beam.CombineGlobally(bad_combine))
 
     self.assertEqual(
         "All functions for a Combine PTransform must accept a "
@@ -1255,9 +1255,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return list(range(n+1))
 
     d = (self.p
-         | beam.Create('t', [1, 2, 3]).with_output_types(int)
-         | beam.CombineGlobally('sum', sum_ints)
-         | beam.ParDo('range', range_from_zero))
+         | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+         | 'sum' >> beam.CombineGlobally(sum_ints)
+         | 'range' >> beam.ParDo(range_from_zero))
 
     self.assertEqual(int, d.element_type)
     assert_that(d, equal_to([0, 1, 2, 3, 4, 5, 6]))
@@ -1272,8 +1272,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
       return reduce(operator.mul, ints, 1)
 
     d = (self.p
-         | beam.Create('k', [5, 5, 5, 5]).with_output_types(int)
-         | beam.CombineGlobally('mul', iter_mul))
+         | 'k' >> beam.Create([5, 5, 5, 5]).with_output_types(int)
+         | 'mul' >> beam.CombineGlobally(iter_mul))
 
     assert_that(d, equal_to([625]))
     self.p.run()
@@ -1290,8 +1290,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('k', [5, 5, 5, 5]).with_output_types(int)
-       | beam.CombineGlobally('mul', iter_mul))
+       | 'k' >> beam.Create([5, 5, 5, 5]).with_output_types(int)
+       | 'mul' >> beam.CombineGlobally(iter_mul))
       self.p.run()
 
     self.assertStartswith(
@@ -1305,8 +1305,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_combine_pipeline_type_check_using_methods(self):
     d = (self.p
-         | beam.Create('s', ['t', 'e', 's', 't']).with_output_types(str)
-         | (beam.CombineGlobally('concat', lambda s: ''.join(s))
+         | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+         | 'concat' >> (beam.CombineGlobally(lambda s: ''.join(s))
             .with_input_types(str).with_output_types(str)))
 
     def matcher(expected):
@@ -1321,8 +1321,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('s', range(5)).with_output_types(int)
-         | (beam.CombineGlobally('sum', lambda s: sum(s))
+         | 's' >> beam.Create(range(5)).with_output_types(int)
+         | 'sum' >> (beam.CombineGlobally(lambda s: sum(s))
             .with_input_types(int).with_output_types(int)))
 
     assert_that(d, equal_to([10]))
@@ -1331,8 +1331,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_combine_pipeline_type_check_violation_using_methods(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('e', range(3)).with_output_types(int)
-       | (beam.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
+       | 'e' >> beam.Create(range(3)).with_output_types(int)
+       | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s)))
           .with_input_types(str).with_output_types(str)))
 
     self.assertEqual("Input type hint violation at sort join: "
@@ -1345,8 +1345,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('e', range(3)).with_output_types(int)
-       | (beam.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
+       | 'e' >> beam.Create(range(3)).with_output_types(int)
+       | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s)))
           .with_input_types(str).with_output_types(str)))
       self.p.run()
 
@@ -1363,9 +1363,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('e', range(3)).with_output_types(int)
-       | beam.CombineGlobally('sort join', lambda s: ''.join(sorted(s)))
-       | beam.Map('f', lambda x: x + 1))
+       | 'e' >> beam.Create(range(3)).with_output_types(int)
+       | 'sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
+       | 'f' >> beam.Map(lambda x: x + 1))
 
     self.assertEqual(
         'Pipeline type checking is enabled, '
@@ -1375,8 +1375,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_mean_globally_pipeline_checking_satisfied(self):
     d = (self.p
-         | beam.Create('c', range(5)).with_output_types(int)
-         | combine.Mean.Globally('mean'))
+         | 'c' >> beam.Create(range(5)).with_output_types(int)
+         | 'mean' >> combine.Mean.Globally())
 
     self.assertTrue(d.element_type is float)
     assert_that(d, equal_to([2.0]))
@@ -1385,8 +1385,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_mean_globally_pipeline_checking_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('c', ['test']).with_output_types(str)
-       | combine.Mean.Globally('mean'))
+       | 'c' >> beam.Create(['test']).with_output_types(str)
+       | 'mean' >> combine.Mean.Globally())
 
     self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
                      "requires Tuple[TypeVariable[K], "
@@ -1398,8 +1398,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('c', range(5)).with_output_types(int)
-         | combine.Mean.Globally('mean'))
+         | 'c' >> beam.Create(range(5)).with_output_types(int)
+         | 'mean' >> combine.Mean.Globally())
 
     self.assertTrue(d.element_type is float)
     assert_that(d, equal_to([2.0]))
@@ -1411,8 +1411,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('c', ['t', 'e', 's', 't']).with_output_types(str)
-       | combine.Mean.Globally('mean'))
+       | 'c' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+       | 'mean' >> combine.Mean.Globally())
       self.p.run()
       self.assertEqual("Runtime type violation detected for transform input "
                        "when executing ParDoFlatMap(Combine): Tuple[Any, "
@@ -1427,10 +1427,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_mean_per_key_pipeline_checking_satisfied(self):
     d = (self.p
-         | beam.Create('c', range(5)).with_output_types(int)
-         | (beam.Map('even group', lambda x: (not x % 2, x))
+         | 'c' >> beam.Create(range(5)).with_output_types(int)
+         | 'even group' >> (beam.Map(lambda x: (not x % 2, x))
             .with_output_types(typehints.KV[bool, int]))
-         | combine.Mean.PerKey('even mean'))
+         | 'even mean' >> combine.Mean.PerKey())
 
     self.assertCompatible(typehints.KV[bool, float], d.element_type)
     assert_that(d, equal_to([(False, 2.0), (True, 2.0)]))
@@ -1439,10 +1439,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_mean_per_key_pipeline_checking_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('e', map(str, range(5))).with_output_types(str)
-       | (beam.Map('upper pair', lambda x: (x.upper(), x))
+       | 'e' >> beam.Create(map(str, range(5))).with_output_types(str)
+       | 'upper pair' >> (beam.Map(lambda x: (x.upper(), x))
           .with_output_types(typehints.KV[str, str]))
-       | combine.Mean.PerKey('even mean'))
+       | 'even mean' >> combine.Mean.PerKey())
       self.p.run()
 
     self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
@@ -1455,10 +1455,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('c', range(5)).with_output_types(int)
-         | (beam.Map('odd group', lambda x: (bool(x % 2), x))
+         | 'c' >> beam.Create(range(5)).with_output_types(int)
+         | 'odd group' >> (beam.Map(lambda x: (bool(x % 2), x))
             .with_output_types(typehints.KV[bool, int]))
-         | combine.Mean.PerKey('odd mean'))
+         | 'odd mean' >> combine.Mean.PerKey())
 
     self.assertCompatible(typehints.KV[bool, float], d.element_type)
     assert_that(d, equal_to([(False, 2.0), (True, 2.0)]))
@@ -1470,10 +1470,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('c', range(5)).with_output_types(int)
-       | (beam.Map('odd group', lambda x: (x, str(bool(x % 2))))
+       | 'c' >> beam.Create(range(5)).with_output_types(int)
+       | 'odd group' >> (beam.Map(lambda x: (x, str(bool(x % 2))))
           .with_output_types(typehints.KV[int, str]))
-       | combine.Mean.PerKey('odd mean'))
+       | 'odd mean' >> combine.Mean.PerKey())
       self.p.run()
 
     self.assertStartswith(
@@ -1494,8 +1494,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_count_globally_pipeline_type_checking_satisfied(self):
     d = (self.p
-         | beam.Create('p', range(5)).with_output_types(int)
-         | combine.Count.Globally('count int'))
+         | 'p' >> beam.Create(range(5)).with_output_types(int)
+         | 'count int' >> combine.Count.Globally())
 
     self.assertTrue(d.element_type is int)
     assert_that(d, equal_to([5]))
@@ -1505,8 +1505,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('p', range(5)).with_output_types(int)
-         | combine.Count.Globally('count int'))
+         | 'p' >> beam.Create(range(5)).with_output_types(int)
+         | 'count int' >> combine.Count.Globally())
 
     self.assertTrue(d.element_type is int)
     assert_that(d, equal_to([5]))
@@ -1514,10 +1514,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_count_perkey_pipeline_type_checking_satisfied(self):
     d = (self.p
-         | beam.Create('p', range(5)).with_output_types(int)
-         | (beam.Map('even group', lambda x: (not x % 2, x))
+         | 'p' >> beam.Create(range(5)).with_output_types(int)
+         | 'even group' >> (beam.Map(lambda x: (not x % 2, x))
             .with_output_types(typehints.KV[bool, int]))
-         | combine.Count.PerKey('count int'))
+         | 'count int' >> combine.Count.PerKey())
 
     self.assertCompatible(typehints.KV[bool, int], d.element_type)
     assert_that(d, equal_to([(False, 2), (True, 3)]))
@@ -1526,8 +1526,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_count_perkey_pipeline_type_checking_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('p', range(5)).with_output_types(int)
-       | combine.Count.PerKey('count int'))
+       | 'p' >> beam.Create(range(5)).with_output_types(int)
+       | 'count int' >> combine.Count.PerKey())
 
     self.assertEqual("Input type hint violation at GroupByKey: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1538,10 +1538,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('c', ['t', 'e', 's', 't']).with_output_types(str)
-         | beam.Map('dup key', lambda x: (x, x))
+         | 'c' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
+         | 'dup key' >> beam.Map(lambda x: (x, x))
          .with_output_types(typehints.KV[str, str])
-         | combine.Count.PerKey('count dups'))
+         | 'count dups' >> combine.Count.PerKey())
 
     self.assertCompatible(typehints.KV[str, int], d.element_type)
     assert_that(d, equal_to([('e', 1), ('s', 1), ('t', 2)]))
@@ -1549,8 +1549,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_count_perelement_pipeline_type_checking_satisfied(self):
     d = (self.p
-         | beam.Create('w', [1, 1, 2, 3]).with_output_types(int)
-         | combine.Count.PerElement('count elems'))
+         | 'w' >> beam.Create([1, 1, 2, 3]).with_output_types(int)
+         | 'count elems' >> combine.Count.PerElement())
 
     self.assertCompatible(typehints.KV[int, int], d.element_type)
     assert_that(d, equal_to([(1, 2), (2, 1), (3, 1)]))
@@ -1561,8 +1561,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('f', [1, 1, 2, 3])
-       | combine.Count.PerElement('count elems'))
+       | 'f' >> beam.Create([1, 1, 2, 3])
+       | 'count elems' >> combine.Count.PerElement())
 
     self.assertEqual('Pipeline type checking is enabled, however no output '
                      'type-hint was found for the PTransform '
@@ -1573,9 +1573,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('w', [True, True, False, True, True])
+         | 'w' >> beam.Create([True, True, False, True, True])
          .with_output_types(bool)
-         | combine.Count.PerElement('count elems'))
+         | 'count elems' >> combine.Count.PerElement())
 
     self.assertCompatible(typehints.KV[bool, int], d.element_type)
     assert_that(d, equal_to([(False, 1), (True, 4)]))
@@ -1583,8 +1583,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_top_of_pipeline_checking_satisfied(self):
     d = (self.p
-         | beam.Create('n', range(5, 11)).with_output_types(int)
-         | combine.Top.Of('top 3', 3, lambda x, y: x < y))
+         | 'n' >> beam.Create(range(5, 11)).with_output_types(int)
+         | 'top 3' >> combine.Top.Of(3, lambda x, y: x < y))
 
     self.assertCompatible(typehints.Iterable[int],
                           d.element_type)
@@ -1595,8 +1595,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('n', list('testing')).with_output_types(str)
-         | combine.Top.Of('acii top', 3, lambda x, y: x < y))
+         | 'n' >> beam.Create(list('testing')).with_output_types(str)
+         | 'acii top' >> combine.Top.Of(3, lambda x, y: x < y))
 
     self.assertCompatible(typehints.Iterable[str], d.element_type)
     assert_that(d, equal_to([['t', 't', 's']]))
@@ -1605,9 +1605,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_per_key_pipeline_checking_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('n', range(100)).with_output_types(int)
-       | beam.Map('num + 1', lambda x: x + 1).with_output_types(int)
-       | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
+       | 'n' >> beam.Create(range(100)).with_output_types(int)
+       | 'num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int)
+       | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
 
     self.assertEqual("Input type hint violation at GroupByKey: "
                      "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1616,10 +1616,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_per_key_pipeline_checking_satisfied(self):
     d = (self.p
-         | beam.Create('n', range(100)).with_output_types(int)
-         | (beam.Map('group mod 3', lambda x: (x % 3, x))
+         | 'n' >> beam.Create(range(100)).with_output_types(int)
+         | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x))
             .with_output_types(typehints.KV[int, int]))
-         | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
+         | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
 
     self.assertCompatible(typehints.Tuple[int, typehints.Iterable[int]],
                           d.element_type)
@@ -1630,10 +1630,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('n', range(21))
-         | (beam.Map('group mod 3', lambda x: (x % 3, x))
+         | 'n' >> beam.Create(range(21))
+         | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x))
             .with_output_types(typehints.KV[int, int]))
-         | combine.Top.PerKey('top mod', 1, lambda a, b: a < b))
+         | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b))
 
     self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
                           d.element_type)
@@ -1642,8 +1642,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_sample_globally_pipeline_satisfied(self):
     d = (self.p
-         | beam.Create('m', [2, 2, 3, 3]).with_output_types(int)
-         | combine.Sample.FixedSizeGlobally('sample', 3))
+         | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int)
+         | 'sample' >> combine.Sample.FixedSizeGlobally(3))
 
     self.assertCompatible(typehints.Iterable[int], d.element_type)
 
@@ -1658,8 +1658,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('m', [2, 2, 3, 3]).with_output_types(int)
-         | combine.Sample.FixedSizeGlobally('sample', 2))
+         | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int)
+         | 'sample' >> combine.Sample.FixedSizeGlobally(2))
 
     self.assertCompatible(typehints.Iterable[int], d.element_type)
 
@@ -1672,9 +1672,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_sample_per_key_pipeline_satisfied(self):
     d = (self.p
-         | (beam.Create('m', [(1, 2), (1, 2), (2, 3), (2, 3)])
+         | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
             .with_output_types(typehints.KV[int, int]))
-         | combine.Sample.FixedSizePerKey('sample', 2))
+         | 'sample' >> combine.Sample.FixedSizePerKey(2))
 
     self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
                           d.element_type)
@@ -1691,9 +1691,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | (beam.Create('m', [(1, 2), (1, 2), (2, 3), (2, 3)])
+         | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)])
             .with_output_types(typehints.KV[int, int]))
-         | combine.Sample.FixedSizePerKey('sample', 1))
+         | 'sample' >> combine.Sample.FixedSizePerKey(1))
 
     self.assertCompatible(typehints.KV[int, typehints.Iterable[int]],
                           d.element_type)
@@ -1708,8 +1708,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_to_list_pipeline_check_satisfied(self):
     d = (self.p
-         | beam.Create('c', (1, 2, 3, 4)).with_output_types(int)
-         | combine.ToList('to list'))
+         | 'c' >> beam.Create((1, 2, 3, 4)).with_output_types(int)
+         | 'to list' >> combine.ToList())
 
     self.assertCompatible(typehints.List[int], d.element_type)
 
@@ -1724,8 +1724,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | beam.Create('c', list('test')).with_output_types(str)
-         | combine.ToList('to list'))
+         | 'c' >> beam.Create(list('test')).with_output_types(str)
+         | 'to list' >> combine.ToList())
 
     self.assertCompatible(typehints.List[str], d.element_type)
 
@@ -1739,8 +1739,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_to_dict_pipeline_check_violated(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create('d', [1, 2, 3, 4]).with_output_types(int)
-       | combine.ToDict('to dict'))
+       | 'd' >> beam.Create([1, 2, 3, 4]).with_output_types(int)
+       | 'to dict' >> combine.ToDict())
 
     self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': "
                      "requires Tuple[TypeVariable[K], "
@@ -1753,7 +1753,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
          | beam.Create(
              'd',
              [(1, 2), (3, 4)]).with_output_types(typehints.Tuple[int, int])
-         | combine.ToDict('to dict'))
+         | 'to dict' >> combine.ToDict())
 
     self.assertCompatible(typehints.Dict[int, int], d.element_type)
     assert_that(d, equal_to([{1: 2, 3: 4}]))
@@ -1763,9 +1763,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
     self.p.options.view_as(TypeOptions).runtime_type_check = True
 
     d = (self.p
-         | (beam.Create('d', [('1', 2), ('3', 4)])
+         | 'd' >> (beam.Create([('1', 2), ('3', 4)])
             .with_output_types(typehints.Tuple[str, int]))
-         | combine.ToDict('to dict'))
+         | 'to dict' >> combine.ToDict())
 
     self.assertCompatible(typehints.Dict[str, int], d.element_type)
     assert_that(d, equal_to([{'1': 2, '3': 4}]))
@@ -1776,8 +1776,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
     with self.assertRaises(TypeError) as e:
       (self.p
-       | beam.Create('t', [1, 2, 3]).with_output_types(int)
-       | beam.Map('len', lambda x: len(x)).with_output_types(int))
+       | 't' >> beam.Create([1, 2, 3]).with_output_types(int)
+       | 'len' >> beam.Map(lambda x: len(x)).with_output_types(int))
       self.p.run()
 
     # Our special type-checking related TypeError shouldn't have been raised.
@@ -1799,8 +1799,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
         beam.core.GroupByKeyOnly().infer_output_type(typehints.KV[str, int]))
 
   def test_pipeline_inference(self):
-    created = self.p | beam.Create('c', ['a', 'b', 'c'])
-    mapped = created | beam.Map('pair with 1', lambda x: (x, 1))
+    created = self.p | 'c' >> beam.Create(['a', 'b', 'c'])
+    mapped = created | 'pair with 1' >> beam.Map(lambda x: (x, 1))
     grouped = mapped | beam.GroupByKey()
     self.assertEqual(str, created.element_type)
     self.assertEqual(typehints.KV[str, int], mapped.element_type)
@@ -1810,8 +1810,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   def test_inferred_bad_kv_type(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
       _ = (self.p
-           | beam.Create('t', ['a', 'b', 'c'])
-           | beam.Map('ungroupable', lambda x: (x, 0, 1.0))
+           | 't' >> beam.Create(['a', 'b', 'c'])
+           | 'ungroupable' >> beam.Map(lambda x: (x, 0, 1.0))
            | beam.GroupByKey())
 
     self.assertEqual('Input type hint violation at GroupByKey: '
@@ -1821,11 +1821,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
   def test_type_inference_command_line_flag_toggle(self):
     self.p.options.view_as(TypeOptions).pipeline_type_check = False
-    x = self.p | beam.Create('t', [1, 2, 3, 4])
+    x = self.p | 't' >> beam.Create([1, 2, 3, 4])
     self.assertIsNone(x.element_type)
 
     self.p.options.view_as(TypeOptions).pipeline_type_check = True
-    x = self.p | beam.Create('m', [1, 2, 3, 4])
+    x = self.p | 'm' >> beam.Create([1, 2, 3, 4])
     self.assertEqual(int, x.element_type)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/util.py 
b/sdks/python/apache_beam/transforms/util.py
index b7a121d..bbb7787 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -160,9 +160,9 @@ def KvSwap(label='KvSwap'):  # pylint: disable=invalid-name
 def RemoveDuplicates(pcoll):  # pylint: disable=invalid-name
   """Produces a PCollection containing the unique elements of a PCollection."""
   return (pcoll
-          | Map('ToPairs', lambda v: (v, None))
-          | CombinePerKey('Group', lambda vs: None)
-          | Keys('RemoveDuplicates'))
+          | 'ToPairs' >> Map(lambda v: (v, None))
+          | 'Group' >> CombinePerKey(lambda vs: None)
+          | 'RemoveDuplicates' >> Keys())
 
 
 class DataflowAssertException(Exception):
@@ -220,7 +220,7 @@ def assert_that(actual, matcher, label='assert_that'):
   class AssertThat(PTransform):
 
     def apply(self, pipeline):
-      return pipeline | Create('singleton', [None]) | Map(match, AllOf(actual))
+      return pipeline | 'singleton' >> Create([None]) | Map(match, 
AllOf(actual))
 
     def default_label(self):
       return label

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py 
b/sdks/python/apache_beam/transforms/window_test.py
index 186fcd4..012dde4 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -127,14 +127,14 @@ class WindowTest(unittest.TestCase):
     self.assertEqual([IntervalWindow(2, 25)], merge(2, 15, 10))
 
   def timestamped_key_values(self, pipeline, key, *timestamps):
-    return (pipeline | Create('start', timestamps)
+    return (pipeline | 'start' >> Create(timestamps)
             | Map(lambda x: WindowedValue((key, x), x, [])))
 
   def test_sliding_windows(self):
     p = Pipeline('DirectPipelineRunner')
     pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3)
     result = (pcoll
-              | WindowInto('w', SlidingWindows(period=2, size=4))
+              | 'w' >> WindowInto(SlidingWindows(period=2, size=4))
               | GroupByKey()
               | reify_windows)
     expected = [('key @ [-2.0, 2.0)', [1]),
@@ -147,7 +147,7 @@ class WindowTest(unittest.TestCase):
     p = Pipeline('DirectPipelineRunner')
     pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27)
     result = (pcoll
-              | WindowInto('w', Sessions(10))
+              | 'w' >> WindowInto(Sessions(10))
               | GroupByKey()
               | sort_values
               | reify_windows)
@@ -159,9 +159,9 @@ class WindowTest(unittest.TestCase):
   def test_timestamped_value(self):
     p = Pipeline('DirectPipelineRunner')
     result = (p
-              | Create('start', [(k, k) for k in range(10)])
+              | 'start' >> Create([(k, k) for k in range(10)])
               | Map(lambda (x, t): TimestampedValue(x, t))
-              | WindowInto('w', FixedWindows(5))
+              | 'w' >> WindowInto(FixedWindows(5))
               | Map(lambda v: ('key', v))
               | GroupByKey())
     assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]),
@@ -172,13 +172,13 @@ class WindowTest(unittest.TestCase):
     p = Pipeline('DirectPipelineRunner')
     result = (p
               # Create some initial test values.
-              | Create('start', [(k, k) for k in range(10)])
+              | 'start' >> Create([(k, k) for k in range(10)])
               # The purpose of the WindowInto transform is to establish a
               # FixedWindows windowing function for the PCollection.
               # It does not bucket elements into windows since the timestamps
               # from Create are not spaced 5 ms apart and very likely they all
               # fall into the same window.
-              | WindowInto('w', FixedWindows(5))
+              | 'w' >> WindowInto(FixedWindows(5))
               # Generate timestamped values using the values as timestamps.
               # Now there are values 5 ms apart and since Map propagates the
               # windowing function from input to output the output PCollection

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py 
b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index 51163bc..af3668c 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -100,7 +100,7 @@ class WriteTest(unittest.TestCase):
     write_to_test_sink = WriteToTestSink(return_init_result,
                                          return_write_results)
     p = Pipeline(options=PipelineOptions([]))
-    result = p | beam.Create('start', data) | write_to_test_sink
+    result = p | 'start' >> beam.Create(data) | write_to_test_sink
 
     assert_that(result, is_empty())
     p.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py 
b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index b25f231..4e1ab68 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -80,7 +80,7 @@ class MainInputTest(unittest.TestCase):
       ['a', 'b', 'c'] | beam.ParDo(MyDoFn())
 
     with self.assertRaises(typehints.TypeCheckError):
-      [1, 2, 3] | (beam.ParDo(MyDoFn()) | beam.ParDo('again', MyDoFn()))
+      [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))
 
   def test_typed_dofn_instance(self):
     class MyDoFn(beam.DoFn):
@@ -95,7 +95,7 @@ class MainInputTest(unittest.TestCase):
       ['a', 'b', 'c'] | beam.ParDo(my_do_fn)
 
     with self.assertRaises(typehints.TypeCheckError):
-      [1, 2, 3] | (beam.ParDo(my_do_fn) | beam.ParDo('again', my_do_fn))
+      [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn))
 
 
 class SideInputTest(unittest.TestCase):
@@ -170,14 +170,14 @@ class SideInputTest(unittest.TestCase):
       return s * times
     p = beam.Pipeline(options=PipelineOptions([]))
     main_input = p | beam.Create(['a', 'bb', 'c'])
-    side_input = p | beam.Create('side', [3])
+    side_input = p | 'side' >> beam.Create([3])
     result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input))
     assert_that(result, equal_to(['aaa', 'bbbbbb', 'ccc']))
     p.run()
 
-    bad_side_input = p | beam.Create('bad_side', ['z'])
+    bad_side_input = p | 'bad_side' >> beam.Create(['z'])
     with self.assertRaises(typehints.TypeCheckError):
-      main_input | beam.Map('again', repeat, 
pvalue.AsSingleton(bad_side_input))
+      main_input | 'again' >> beam.Map(repeat, 
pvalue.AsSingleton(bad_side_input))
 
   def test_deferred_side_input_iterable(self):
     @typehints.with_input_types(str, typehints.Iterable[str])
@@ -185,14 +185,14 @@ class SideInputTest(unittest.TestCase):
       return glue.join(sorted(items))
     p = beam.Pipeline(options=PipelineOptions([]))
     main_input = p | beam.Create(['a', 'bb', 'c'])
-    side_input = p | beam.Create('side', ['x', 'y', 'z'])
+    side_input = p | 'side' >> beam.Create(['x', 'y', 'z'])
     result = main_input | beam.Map(concat, pvalue.AsIter(side_input))
     assert_that(result, equal_to(['xayaz', 'xbbybbz', 'xcycz']))
     p.run()
 
-    bad_side_input = p | beam.Create('bad_side', [1, 2, 3])
+    bad_side_input = p | 'bad_side' >> beam.Create([1, 2, 3])
     with self.assertRaises(typehints.TypeCheckError):
-      main_input | beam.Map('fail', concat, pvalue.AsIter(bad_side_input))
+      main_input | 'fail' >> beam.Map(concat, pvalue.AsIter(bad_side_input))
 
 
 class CustomTransformTest(unittest.TestCase):

Reply via email to