http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/combiners_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index a747112..0439fe1 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -39,16 +39,16 @@ class CombineTest(unittest.TestCase): size = len(vals) # First for global combines. - pcoll = pipeline | Create('start', vals) - result_mean = pcoll | combine.Mean.Globally('mean') - result_count = pcoll | combine.Count.Globally('count') + pcoll = pipeline | 'start' >> Create(vals) + result_mean = pcoll | 'mean' >> combine.Mean.Globally() + result_count = pcoll | 'count' >> combine.Count.Globally() assert_that(result_mean, equal_to([mean]), label='assert:mean') assert_that(result_count, equal_to([size]), label='assert:size') # Again for per-key combines. - pcoll = pipeline | Create('start-perkey', [('a', x) for x in vals]) - result_key_mean = pcoll | combine.Mean.PerKey('mean-perkey') - result_key_count = pcoll | combine.Count.PerKey('count-perkey') + pcoll = pipeline | 'start-perkey' >> Create([('a', x) for x in vals]) + result_key_mean = pcoll | 'mean-perkey' >> combine.Mean.PerKey() + result_key_count = pcoll | 'count-perkey' >> combine.Count.PerKey() assert_that(result_key_mean, equal_to([('a', mean)]), label='key:mean') assert_that(result_key_count, equal_to([('a', size)]), label='key:size') pipeline.run() @@ -66,9 +66,9 @@ class CombineTest(unittest.TestCase): 9: 'nniiinne'} # First for global combines. - pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]) - result_top = pcoll | combine.Top.Largest('top', 5) - result_bot = pcoll | combine.Top.Smallest('bot', 4) + pcoll = pipeline | 'start' >> Create([6, 3, 1, 1, 9, 1, 5, 2, 0, 6]) + result_top = pcoll | 'top' >> combine.Top.Largest(5) + result_bot = pcoll | 'bot' >> combine.Top.Smallest(4) result_cmp = pcoll | combine.Top.Of( 'cmp', 6, @@ -81,8 +81,8 @@ class CombineTest(unittest.TestCase): # Again for per-key combines. pcoll = pipeline | Create( 'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]]) - result_key_top = pcoll | combine.Top.LargestPerKey('top-perkey', 5) - result_key_bot = pcoll | combine.Top.SmallestPerKey('bot-perkey', 4) + result_key_top = pcoll | 'top-perkey' >> combine.Top.LargestPerKey(5) + result_key_bot = pcoll | 'bot-perkey' >> combine.Top.SmallestPerKey(4) result_key_cmp = pcoll | combine.Top.PerKey( 'cmp-perkey', 6, @@ -99,15 +99,15 @@ class CombineTest(unittest.TestCase): def test_top_shorthands(self): pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]) - result_top = pcoll | beam.CombineGlobally('top', combine.Largest(5)) - result_bot = pcoll | beam.CombineGlobally('bot', combine.Smallest(4)) + pcoll = pipeline | 'start' >> Create([6, 3, 1, 1, 9, 1, 5, 2, 0, 6]) + result_top = pcoll | 'top' >> beam.CombineGlobally(combine.Largest(5)) + result_bot = pcoll | 'bot' >> beam.CombineGlobally(combine.Smallest(4)) assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top') assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot') pcoll = pipeline | Create( 'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]]) - result_ktop = pcoll | beam.CombinePerKey('top-perkey', combine.Largest(5)) + result_ktop = pcoll | 'top-perkey' >> beam.CombinePerKey(combine.Largest(5)) result_kbot = pcoll | beam.CombinePerKey( 'bot-perkey', combine.Smallest(4)) assert_that(result_ktop, equal_to([('a', [9, 6, 6, 5, 3])]), label='k:top') @@ -119,7 +119,7 @@ class CombineTest(unittest.TestCase): # First test global samples (lots of them). for ix in xrange(300): pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | Create('start', [1, 1, 2, 2]) + pcoll = pipeline | 'start' >> Create([1, 1, 2, 2]) result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3) def matcher(): @@ -141,7 +141,7 @@ class CombineTest(unittest.TestCase): pcoll = pipeline | Create( 'start-perkey', sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), [])) - result = pcoll | combine.Sample.FixedSizePerKey('sample', 3) + result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3) def matcher(): def match(actual): @@ -158,7 +158,7 @@ class CombineTest(unittest.TestCase): p = Pipeline('DirectPipelineRunner') result = ( p - | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)]) + | 'a' >> Create([(100, 0.0), ('b', 10, -1), ('c', 1, 100)]) | beam.CombineGlobally(combine.TupleCombineFn(max, combine.MeanCombineFn(), sum)).without_defaults()) @@ -179,8 +179,8 @@ class CombineTest(unittest.TestCase): def test_to_list_and_to_dict(self): pipeline = Pipeline('DirectPipelineRunner') the_list = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6] - pcoll = pipeline | Create('start', the_list) - result = pcoll | combine.ToList('to list') + pcoll = pipeline | 'start' >> Create(the_list) + result = pcoll | 'to list' >> combine.ToList() def matcher(expected): def match(actual): @@ -191,8 +191,8 @@ class CombineTest(unittest.TestCase): pipeline = Pipeline('DirectPipelineRunner') pairs = [(1, 2), (3, 4), (5, 6)] - pcoll = pipeline | Create('start-pairs', pairs) - result = pcoll | combine.ToDict('to dict') + pcoll = pipeline | 'start-pairs' >> Create(pairs) + result = pcoll | 'to dict' >> combine.ToDict() def matcher(): def match(actual): @@ -221,8 +221,8 @@ class CombineTest(unittest.TestCase): return main | Map(lambda _, s: s, side) p = Pipeline('DirectPipelineRunner') - result1 = p | Create('label1', []) | CombineWithSideInput('L1') - result2 = p | Create('label2', [1, 2, 3, 4]) | CombineWithSideInput('L2') + result1 = p | 'label1' >> Create([]) | 'L1' >> CombineWithSideInput() + result2 = p | 'label2' >> Create([1, 2, 3, 4]) | 'L2' >> CombineWithSideInput() assert_that(result1, equal_to([0]), label='r1') assert_that(result2, equal_to([10]), label='r2') p.run()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index b288321..44a6d29 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -814,12 +814,12 @@ class CombineGlobally(PTransform): return transform combined = (pcoll - | add_input_types(Map('KeyWithVoid', lambda v: (None, v)) + | 'KeyWithVoid' >> add_input_types(Map(lambda v: (None, v)) .with_output_types( KV[None, pcoll.element_type])) | CombinePerKey( 'CombinePerKey', self.fn, *self.args, **self.kwargs) - | Map('UnKey', lambda (k, v): v)) + | 'UnKey' >> Map(lambda (k, v): v)) if not self.has_defaults and not self.as_view: return combined @@ -851,8 +851,8 @@ class CombineGlobally(PTransform): else: return transform return (pcoll.pipeline - | Create('DoOnce', [None]) - | typed(Map('InjectDefault', lambda _, s: s, view))) + | 'DoOnce' >> Create([None]) + | 'InjectDefault' >> typed(Map(lambda _, s: s, view))) class CombinePerKey(PTransformWithSideInputs): @@ -1045,9 +1045,9 @@ class GroupByKey(PTransform): gbk_output_type = KV[key_type, Iterable[value_type]] return (pcoll - | (ParDo('reify_windows', self.ReifyWindows()) + | 'reify_windows' >> (ParDo(self.ReifyWindows()) .with_output_types(reify_output_type)) - | (GroupByKeyOnly('group_by_key') + | 'group_by_key' >> (GroupByKeyOnly() .with_input_types(reify_output_type) .with_output_types(gbk_input_type)) | (ParDo('group_by_window', @@ -1056,8 +1056,8 @@ class GroupByKey(PTransform): .with_output_types(gbk_output_type))) else: return (pcoll - | ParDo('reify_windows', self.ReifyWindows()) - | GroupByKeyOnly('group_by_key') + | 'reify_windows' >> ParDo(self.ReifyWindows()) + | 'group_by_key' >> GroupByKeyOnly() | ParDo('group_by_window', self.GroupAlsoByWindow(pcoll.windowing))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index b652bca..6eb28b0 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -691,7 +691,7 @@ def ptransform_fn(fn): With either method the custom PTransform can be used in pipelines as if it were one of the "native" PTransforms:: - result_pcoll = input_pcoll | CustomMapper('label', somefn) + result_pcoll = input_pcoll | 'label' >> CustomMapper(somefn) Note that for both solutions the underlying implementation of the pipe operator (i.e., `|`) will inject the pcoll argument in its proper place http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index feb081c..8121c1e 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -53,12 +53,12 @@ class PTransformTest(unittest.TestCase): str(PTransform())) pa = Pipeline('DirectPipelineRunner') - res = pa | beam.Create('a_label', [1, 2]) + res = pa | 'a_label' >> beam.Create([1, 2]) self.assertEqual('<Create(PTransform) label=[a_label]>', str(res.producer.transform)) pc = Pipeline('DirectPipelineRunner') - res = pc | beam.Create('with_inputs', [1, 2]) + res = pc | 'with_inputs' >> beam.Create([1, 2]) inputs_tr = res.producer.transform inputs_tr.inputs = ('ci',) self.assertEqual( @@ -66,7 +66,7 @@ class PTransformTest(unittest.TestCase): str(inputs_tr)) pd = Pipeline('DirectPipelineRunner') - res = pd | beam.Create('with_sidei', [1, 2]) + res = pd | 'with_sidei' >> beam.Create([1, 2]) side_tr = res.producer.transform side_tr.side_inputs = (4,) self.assertEqual( @@ -110,8 +110,8 @@ class PTransformTest(unittest.TestCase): return [context.element + addon] pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', [1, 2, 3]) - result = pcoll | beam.ParDo('do', AddNDoFn(), 10) + pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) + result = pcoll | 'do' >> beam.ParDo(AddNDoFn(), 10) assert_that(result, equal_to([11, 12, 13])) pipeline.run() @@ -122,21 +122,21 @@ class PTransformTest(unittest.TestCase): pass pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', [1, 2, 3]) + pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) with self.assertRaises(ValueError): - pcoll | beam.ParDo('do', MyDoFn) # Note the lack of ()'s + pcoll | 'do' >> beam.ParDo(MyDoFn) # Note the lack of ()'s def test_do_with_callable(self): pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', [1, 2, 3]) - result = pcoll | beam.FlatMap('do', lambda x, addon: [x + addon], 10) + pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) + result = pcoll | 'do' >> beam.FlatMap(lambda x, addon: [x + addon], 10) assert_that(result, equal_to([11, 12, 13])) pipeline.run() def test_do_with_side_input_as_arg(self): pipeline = Pipeline('DirectPipelineRunner') - side = pipeline | beam.Create('side', [10]) - pcoll = pipeline | beam.Create('start', [1, 2, 3]) + side = pipeline | 'side' >> beam.Create([10]) + pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) result = pcoll | beam.FlatMap( 'do', lambda x, addon: [x + addon], pvalue.AsSingleton(side)) assert_that(result, equal_to([11, 12, 13])) @@ -144,8 +144,8 @@ class PTransformTest(unittest.TestCase): def test_do_with_side_input_as_keyword_arg(self): pipeline = Pipeline('DirectPipelineRunner') - side = pipeline | beam.Create('side', [10]) - pcoll = pipeline | beam.Create('start', [1, 2, 3]) + side = pipeline | 'side' >> beam.Create([10]) + pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) result = pcoll | beam.FlatMap( 'do', lambda x, addon: [x + addon], addon=pvalue.AsSingleton(side)) assert_that(result, equal_to([11, 12, 13])) @@ -153,8 +153,8 @@ class PTransformTest(unittest.TestCase): def test_do_with_do_fn_returning_string_raises_warning(self): pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', ['2', '9', '3']) - pcoll | beam.FlatMap('do', lambda x: x + '1') + pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3']) + pcoll | 'do' >> beam.FlatMap(lambda x: x + '1') # Since the DoFn directly returns a string we should get an error warning # us. @@ -167,8 +167,8 @@ class PTransformTest(unittest.TestCase): def test_do_with_do_fn_returning_dict_raises_warning(self): pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', ['2', '9', '3']) - pcoll | beam.FlatMap('do', lambda x: {x: '1'}) + pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3']) + pcoll | 'do' >> beam.FlatMap(lambda x: {x: '1'}) # Since the DoFn directly returns a dict we should get an error warning # us. @@ -181,9 +181,9 @@ class PTransformTest(unittest.TestCase): def test_do_with_side_outputs_maintains_unique_name(self): pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', [1, 2, 3]) - r1 = pcoll | beam.FlatMap('a', lambda x: [x + 1]).with_outputs(main='m') - r2 = pcoll | beam.FlatMap('b', lambda x: [x + 2]).with_outputs(main='m') + pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) + r1 = pcoll | 'a' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m') + r2 = pcoll | 'b' >> beam.FlatMap(lambda x: [x + 2]).with_outputs(main='m') assert_that(r1.m, equal_to([2, 3, 4]), label='r1') assert_that(r2.m, equal_to([3, 4, 5]), label='r2') pipeline.run() @@ -194,8 +194,8 @@ class PTransformTest(unittest.TestCase): def incorrect_par_do_fn(x): return x + 5 pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', [2, 9, 3]) - pcoll | beam.FlatMap('do', incorrect_par_do_fn) + pcoll = pipeline | 'start' >> beam.Create([2, 9, 3]) + pcoll | 'do' >> beam.FlatMap(incorrect_par_do_fn) # It's a requirement that all user-defined functions to a ParDo return # an iterable. with self.assertRaises(typehints.TypeCheckError) as cm: @@ -215,8 +215,8 @@ class PTransformTest(unittest.TestCase): def finish_bundle(self, c): yield 'finish' pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', [1, 2, 3]) - result = pcoll | beam.ParDo('do', MyDoFn()) + pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) + result = pcoll | 'do' >> beam.ParDo(MyDoFn()) # May have many bundles, but each has a start and finish. def matcher(): @@ -230,7 +230,7 @@ class PTransformTest(unittest.TestCase): def test_filter(self): pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', [1, 2, 3, 4]) + pcoll = pipeline | 'start' >> beam.Create([1, 2, 3, 4]) result = pcoll | beam.Filter( 'filter', lambda x: x % 2 == 0) assert_that(result, equal_to([2, 4])) @@ -256,15 +256,15 @@ class PTransformTest(unittest.TestCase): def test_combine_with_combine_fn(self): vals = [1, 2, 3, 4, 5, 6, 7] pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', vals) - result = pcoll | beam.CombineGlobally('mean', self._MeanCombineFn()) + pcoll = pipeline | 'start' >> beam.Create(vals) + result = pcoll | 'mean' >> beam.CombineGlobally(self._MeanCombineFn()) assert_that(result, equal_to([sum(vals) / len(vals)])) pipeline.run() def test_combine_with_callable(self): vals = [1, 2, 3, 4, 5, 6, 7] pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', vals) + pcoll = pipeline | 'start' >> beam.Create(vals) result = pcoll | beam.CombineGlobally(sum) assert_that(result, equal_to([sum(vals)])) pipeline.run() @@ -272,8 +272,8 @@ class PTransformTest(unittest.TestCase): def test_combine_with_side_input_as_arg(self): values = [1, 2, 3, 4, 5, 6, 7] pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', values) - divisor = pipeline | beam.Create('divisor', [2]) + pcoll = pipeline | 'start' >> beam.Create(values) + divisor = pipeline | 'divisor' >> beam.Create([2]) result = pcoll | beam.CombineGlobally( 'max', # Multiples of divisor only. @@ -287,9 +287,9 @@ class PTransformTest(unittest.TestCase): vals_1 = [1, 2, 3, 4, 5, 6, 7] vals_2 = [2, 4, 6, 8, 10, 12, 14] pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', ([('a', x) for x in vals_1] + + pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] + [('b', x) for x in vals_2])) - result = pcoll | beam.CombinePerKey('mean', self._MeanCombineFn()) + result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn()) assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)), ('b', sum(vals_2) / len(vals_2))])) pipeline.run() @@ -298,7 +298,7 @@ class PTransformTest(unittest.TestCase): vals_1 = [1, 2, 3, 4, 5, 6, 7] vals_2 = [2, 4, 6, 8, 10, 12, 14] pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', ([('a', x) for x in vals_1] + + pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] + [('b', x) for x in vals_2])) result = pcoll | beam.CombinePerKey(sum) assert_that(result, equal_to([('a', sum(vals_1)), ('b', sum(vals_2))])) @@ -308,9 +308,9 @@ class PTransformTest(unittest.TestCase): vals_1 = [1, 2, 3, 4, 5, 6, 7] vals_2 = [2, 4, 6, 8, 10, 12, 14] pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', ([('a', x) for x in vals_1] + + pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] + [('b', x) for x in vals_2])) - divisor = pipeline | beam.Create('divisor', [2]) + divisor = pipeline | 'divisor' >> beam.Create([2]) result = pcoll | beam.CombinePerKey( lambda vals, d: max(v for v in vals if v % d == 0), pvalue.AsSingleton(divisor)) # Multiples of divisor only. @@ -323,7 +323,7 @@ class PTransformTest(unittest.TestCase): pipeline = Pipeline('DirectPipelineRunner') pcoll = pipeline | beam.Create( 'start', [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)]) - result = pcoll | beam.GroupByKey('group') + result = pcoll | 'group' >> beam.GroupByKey() assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])])) pipeline.run() @@ -335,9 +335,9 @@ class PTransformTest(unittest.TestCase): return (context.element % 3) + offset pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8]) + pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) # Attempt nominal partition operation. - partitions = pcoll | beam.Partition('part1', SomePartitionFn(), 4, 1) + partitions = pcoll | 'part1' >> beam.Partition(SomePartitionFn(), 4, 1) assert_that(partitions[0], equal_to([])) assert_that(partitions[1], equal_to([0, 3, 6]), label='p1') assert_that(partitions[2], equal_to([1, 4, 7]), label='p2') @@ -347,14 +347,14 @@ class PTransformTest(unittest.TestCase): # Check that a bad partition label will yield an error. For the # DirectPipelineRunner, this error manifests as an exception. pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8]) - partitions = pcoll | beam.Partition('part2', SomePartitionFn(), 4, 10000) + pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) + partitions = pcoll | 'part2' >> beam.Partition(SomePartitionFn(), 4, 10000) with self.assertRaises(ValueError): pipeline.run() def test_partition_with_callable(self): pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', [0, 1, 2, 3, 4, 5, 6, 7, 8]) + pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) partitions = ( pcoll | beam.Partition( 'part', @@ -370,49 +370,49 @@ class PTransformTest(unittest.TestCase): """Regression test for an issue with how partitions are handled.""" pipeline = Pipeline('DirectPipelineRunner') contents = [('aa', 1), ('bb', 2), ('aa', 2)] - created = pipeline | beam.Create('A', contents) - partitioned = created | beam.Partition('B', lambda x, n: len(x) % n, 3) - flattened = partitioned | beam.Flatten('C') - grouped = flattened | beam.GroupByKey('D') + created = pipeline | 'A' >> beam.Create(contents) + partitioned = created | 'B' >> beam.Partition(lambda x, n: len(x) % n, 3) + flattened = partitioned | 'C' >> beam.Flatten() + grouped = flattened | 'D' >> beam.GroupByKey() assert_that(grouped, equal_to([('aa', [1, 2]), ('bb', [2])])) pipeline.run() def test_flatten_pcollections(self): pipeline = Pipeline('DirectPipelineRunner') - pcoll_1 = pipeline | beam.Create('start_1', [0, 1, 2, 3]) - pcoll_2 = pipeline | beam.Create('start_2', [4, 5, 6, 7]) - result = (pcoll_1, pcoll_2) | beam.Flatten('flatten') + pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3]) + pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7]) + result = (pcoll_1, pcoll_2) | 'flatten' >> beam.Flatten() assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7])) pipeline.run() def test_flatten_no_pcollections(self): pipeline = Pipeline('DirectPipelineRunner') with self.assertRaises(ValueError): - () | beam.Flatten('pipeline arg missing') - result = () | beam.Flatten('empty', pipeline=pipeline) + () | 'pipeline arg missing' >> beam.Flatten() + result = () | 'empty' >> beam.Flatten(pipeline=pipeline) assert_that(result, equal_to([])) pipeline.run() def test_flatten_pcollections_in_iterable(self): pipeline = Pipeline('DirectPipelineRunner') - pcoll_1 = pipeline | beam.Create('start_1', [0, 1, 2, 3]) - pcoll_2 = pipeline | beam.Create('start_2', [4, 5, 6, 7]) + pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3]) + pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7]) result = ([pcoll for pcoll in (pcoll_1, pcoll_2)] - | beam.Flatten('flatten')) + | 'flatten' >> beam.Flatten()) assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7])) pipeline.run() def test_flatten_input_type_must_be_iterable(self): # Inputs to flatten *must* be an iterable. with self.assertRaises(ValueError): - 4 | beam.Flatten('flatten') + 4 | 'flatten' >> beam.Flatten() def test_flatten_input_type_must_be_iterable_of_pcolls(self): # Inputs to flatten *must* be an iterable of PCollections. with self.assertRaises(TypeError): - {'l': 'test'} | beam.Flatten('flatten') + {'l': 'test'} | 'flatten' >> beam.Flatten() with self.assertRaises(TypeError): - set([1, 2, 3]) | beam.Flatten('flatten') + set([1, 2, 3]) | 'flatten' >> beam.Flatten() def test_co_group_by_key_on_list(self): pipeline = Pipeline('DirectPipelineRunner') @@ -420,7 +420,7 @@ class PTransformTest(unittest.TestCase): 'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) pcoll_2 = pipeline | beam.Create( 'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)]) - result = (pcoll_1, pcoll_2) | beam.CoGroupByKey('cgbk') + result = (pcoll_1, pcoll_2) | 'cgbk' >> beam.CoGroupByKey() assert_that(result, equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])), ('c', ([4], [7, 8]))])) @@ -433,7 +433,7 @@ class PTransformTest(unittest.TestCase): pcoll_2 = pipeline | beam.Create( 'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)]) result = ([pc for pc in (pcoll_1, pcoll_2)] - | beam.CoGroupByKey('cgbk')) + | 'cgbk' >> beam.CoGroupByKey()) assert_that(result, equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])), ('c', ([4], [7, 8]))])) @@ -445,7 +445,7 @@ class PTransformTest(unittest.TestCase): 'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) pcoll_2 = pipeline | beam.Create( 'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)]) - result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey('cgbk') + result = {'X': pcoll_1, 'Y': pcoll_2} | 'cgbk' >> beam.CoGroupByKey() assert_that(result, equal_to([('a', {'X': [1, 2], 'Y': [5, 6]}), ('b', {'X': [3], 'Y': []}), ('c', {'X': [4], 'Y': [7, 8]})])) @@ -453,10 +453,10 @@ class PTransformTest(unittest.TestCase): def test_group_by_key_input_must_be_kv_pairs(self): pipeline = Pipeline('DirectPipelineRunner') - pcolls = pipeline | beam.Create('A', [1, 2, 3, 4, 5]) + pcolls = pipeline | 'A' >> beam.Create([1, 2, 3, 4, 5]) with self.assertRaises(typehints.TypeCheckError) as e: - pcolls | beam.GroupByKey('D') + pcolls | 'D' >> beam.GroupByKey() pipeline.run() self.assertStartswith( @@ -466,9 +466,9 @@ class PTransformTest(unittest.TestCase): def test_group_by_key_only_input_must_be_kv_pairs(self): pipeline = Pipeline('DirectPipelineRunner') - pcolls = pipeline | beam.Create('A', ['a', 'b', 'f']) + pcolls = pipeline | 'A' >> beam.Create(['a', 'b', 'f']) with self.assertRaises(typehints.TypeCheckError) as cm: - pcolls | beam.GroupByKeyOnly('D') + pcolls | 'D' >> beam.GroupByKeyOnly() pipeline.run() expected_error_prefix = ('Input type hint violation at D: expected ' @@ -506,20 +506,20 @@ class PTransformTest(unittest.TestCase): t = (beam.Map(lambda x: (x, 1)) | beam.GroupByKey() | beam.Map(lambda (x, ones): (x, sum(ones)))) - result = pipeline | beam.Create('start', ['a', 'a', 'b']) | t + result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t assert_that(result, equal_to([('a', 2), ('b', 1)])) pipeline.run() def test_apply_to_list(self): self.assertItemsEqual( - [1, 2, 3], [0, 1, 2] | beam.Map('add_one', lambda x: x + 1)) - self.assertItemsEqual([1], [0, 1, 2] | beam.Filter('odd', lambda x: x % 2)) + [1, 2, 3], [0, 1, 2] | 'add_one' >> beam.Map(lambda x: x + 1)) + self.assertItemsEqual([1], [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 2)) self.assertItemsEqual([1, 2, 100, 3], - ([1, 2, 3], [100]) | beam.Flatten('flat')) + ([1, 2, 3], [100]) | 'flat' >> beam.Flatten()) join_input = ([('k', 'a')], [('k', 'b'), ('k', 'c')]) self.assertItemsEqual([('k', (['a'], ['b', 'c']))], - join_input | beam.CoGroupByKey('join')) + join_input | 'join' >> beam.CoGroupByKey()) def test_multi_input_ptransform(self): class DisjointUnion(PTransform): @@ -583,7 +583,7 @@ class PTransformLabelsTest(unittest.TestCase): gbk = beam.GroupByKey('gbk') map2 = beam.Map('map2', lambda (x, ones): (x, sum(ones))) t = (map1 | gbk | map2) - result = pipeline | beam.Create('start', ['a', 'a', 'b']) | t + result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t self.assertTrue('map1|gbk|map2/map1' in pipeline.applied_labels) self.assertTrue('map1|gbk|map2/gbk' in pipeline.applied_labels) self.assertTrue('map1|gbk|map2/map2' in pipeline.applied_labels) @@ -592,7 +592,7 @@ class PTransformLabelsTest(unittest.TestCase): def test_apply_custom_transform_without_label(self): pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('pcoll', [1, 2, 3]) + pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3]) custom = PTransformLabelsTest.CustomTransform() result = pipeline.apply(custom, pcoll) self.assertTrue('CustomTransform' in pipeline.applied_labels) @@ -602,7 +602,7 @@ class PTransformLabelsTest(unittest.TestCase): def test_apply_custom_transform_with_label(self): pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('pcoll', [1, 2, 3]) + pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3]) custom = PTransformLabelsTest.CustomTransform('*custom*') result = pipeline.apply(custom, pcoll) self.assertTrue('*custom*' in pipeline.applied_labels) @@ -613,7 +613,7 @@ class PTransformLabelsTest(unittest.TestCase): def test_combine_without_label(self): vals = [1, 2, 3, 4, 5, 6, 7] pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', vals) + pcoll = pipeline | 'start' >> beam.Create(vals) combine = beam.CombineGlobally(sum) result = pcoll | combine self.assertTrue('CombineGlobally(sum)' in pipeline.applied_labels) @@ -622,7 +622,7 @@ class PTransformLabelsTest(unittest.TestCase): def test_apply_ptransform_using_decorator(self): pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('pcoll', [1, 2, 3]) + pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3]) sample = SamplePTransform('*sample*') _ = pcoll | sample self.assertTrue('*sample*' in pipeline.applied_labels) @@ -633,7 +633,7 @@ class PTransformLabelsTest(unittest.TestCase): def test_combine_with_label(self): vals = [1, 2, 3, 4, 5, 6, 7] pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | beam.Create('start', vals) + pcoll = pipeline | 'start' >> beam.Create(vals) combine = beam.CombineGlobally('*sum*', sum) result = pcoll | combine self.assertTrue('*sum*' in pipeline.applied_labels) @@ -642,7 +642,7 @@ class PTransformLabelsTest(unittest.TestCase): def check_label(self, ptransform, expected_label): pipeline = Pipeline('DirectPipelineRunner') - pipeline | beam.Create('start', [('a', 1)]) | ptransform + pipeline | 'start' >> beam.Create([('a', 1)]) | ptransform actual_label = sorted(pipeline.applied_labels - {'start'})[0] self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label)) @@ -679,8 +679,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): return [context.element + five] d = (self.p - | beam.Create('t', [1, 2, 3]).with_output_types(int) - | beam.ParDo('add', AddWithFive(), 5)) + | 't' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'add' >> beam.ParDo(AddWithFive(), 5)) assert_that(d, equal_to([6, 7, 8])) self.p.run() @@ -694,8 +694,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('t', [1, 2, 3]).with_output_types(int) - | beam.ParDo('upper', ToUpperCaseWithPrefix(), 'hello')) + | 't' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello')) self.assertEqual("Type hint violation for 'upper': " "requires <type 'str'> but got <type 'int'> for context", @@ -711,8 +711,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): return [context.element + num] d = (self.p - | beam.Create('t', [1, 2, 3]).with_output_types(int) - | beam.ParDo('add', AddWithNum(), 5)) + | 't' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'add' >> beam.ParDo(AddWithNum(), 5)) assert_that(d, equal_to([6, 7, 8])) self.p.run() @@ -728,8 +728,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('t', ['1', '2', '3']).with_output_types(str) - | beam.ParDo('add', AddWithNum(), 5)) + | 't' >> beam.Create(['1', '2', '3']).with_output_types(str) + | 'add' >> beam.ParDo(AddWithNum(), 5)) self.p.run() self.assertEqual("Type hint violation for 'add': " @@ -746,8 +746,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # will receive a str instead, which should result in a raised exception. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('s', ['b', 'a', 'r']).with_output_types(str) - | beam.FlatMap('to str', int_to_str)) + | 's' >> beam.Create(['b', 'a', 'r']).with_output_types(str) + | 'to str' >> beam.FlatMap(int_to_str)) self.assertEqual("Type hint violation for 'to str': " "requires <type 'int'> but got <type 'str'> for a", @@ -761,8 +761,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # If this type-checks than no error should be raised. d = (self.p - | beam.Create('t', ['t', 'e', 's', 't']).with_output_types(str) - | beam.FlatMap('case', to_all_upper_case)) + | 't' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | 'case' >> beam.FlatMap(to_all_upper_case)) assert_that(d, equal_to(['T', 'E', 'S', 'T'])) self.p.run() @@ -775,10 +775,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # expecting pcoll's of type str instead. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('s', ['t', 'e', 's', 't']).with_output_types(str) - | (beam.FlatMap('score', lambda x: [1] if x == 't' else [2]) + | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | 'score' >> (beam.FlatMap(lambda x: [1] if x == 't' else [2]) .with_input_types(str).with_output_types(int)) - | (beam.FlatMap('upper', lambda x: [x.upper()]) + | 'upper' >> (beam.FlatMap(lambda x: [x.upper()]) .with_input_types(str).with_output_types(str))) self.assertEqual("Type hint violation for 'upper': " @@ -788,10 +788,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_pardo_properly_type_checks_using_type_hint_methods(self): # Pipeline should be created successfully without an error d = (self.p - | beam.Create('s', ['t', 'e', 's', 't']).with_output_types(str) - | beam.FlatMap('dup', lambda x: [x + x]) + | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | 'dup' >> beam.FlatMap(lambda x: [x + x]) .with_input_types(str).with_output_types(str) - | beam.FlatMap('upper', lambda x: [x.upper()]) + | 'upper' >> beam.FlatMap(lambda x: [x.upper()]) .with_input_types(str).with_output_types(str)) assert_that(d, equal_to(['TT', 'EE', 'SS', 'TT'])) @@ -802,8 +802,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # int's, while Map is expecting one of str. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('s', [1, 2, 3, 4]).with_output_types(int) - | beam.Map('upper', lambda x: x.upper()) + | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int) + | 'upper' >> beam.Map(lambda x: x.upper()) .with_input_types(str).with_output_types(str)) self.assertEqual("Type hint violation for 'upper': " @@ -813,8 +813,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_map_properly_type_checks_using_type_hints_methods(self): # No error should be raised if this type-checks properly. d = (self.p - | beam.Create('s', [1, 2, 3, 4]).with_output_types(int) - | beam.Map('to_str', lambda x: str(x)) + | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int) + | 'to_str' >> beam.Map(lambda x: str(x)) .with_input_types(int).with_output_types(str)) assert_that(d, equal_to(['1', '2', '3', '4'])) self.p.run() @@ -829,8 +829,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # However, 'Map' should detect that Create has hinted an int instead. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('s', [1, 2, 3, 4]).with_output_types(int) - | beam.Map('upper', upper)) + | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int) + | 'upper' >> beam.Map(upper)) self.assertEqual("Type hint violation for 'upper': " "requires <type 'str'> but got <type 'int'> for s", @@ -844,8 +844,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # If this type-checks than no error should be raised. d = (self.p - | beam.Create('bools', [True, False, True]).with_output_types(bool) - | beam.Map('to_ints', bool_to_int)) + | 'bools' >> beam.Create([True, False, True]).with_output_types(bool) + | 'to_ints' >> beam.Map(bool_to_int)) assert_that(d, equal_to([1, 0, 1])) self.p.run() @@ -854,10 +854,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # incoming. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('strs', ['1', '2', '3', '4', '5']).with_output_types(str) - | beam.Map('lower', lambda x: x.lower()) + | 'strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str) + | 'lower' >> beam.Map(lambda x: x.lower()) .with_input_types(str).with_output_types(str) - | beam.Filter('below 3', lambda x: x < 3).with_input_types(int)) + | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int)) self.assertEqual("Type hint violation for 'below 3': " "requires <type 'int'> but got <type 'str'> for x", @@ -866,10 +866,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_filter_type_checks_using_type_hints_method(self): # No error should be raised if this type-checks properly. d = (self.p - | beam.Create('strs', ['1', '2', '3', '4', '5']).with_output_types(str) - | beam.Map('to int', lambda x: int(x)) + | 'strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str) + | 'to int' >> beam.Map(lambda x: int(x)) .with_input_types(str).with_output_types(int) - | beam.Filter('below 3', lambda x: x < 3).with_input_types(int)) + | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int)) assert_that(d, equal_to([1, 2])) self.p.run() @@ -881,8 +881,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # Func above was hinted to only take a float, yet an int will be passed. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('ints', [1, 2, 3, 4]).with_output_types(int) - | beam.Filter('half', more_than_half)) + | 'ints' >> beam.Create([1, 2, 3, 4]).with_output_types(int) + | 'half' >> beam.Filter(more_than_half)) self.assertEqual("Type hint violation for 'half': " "requires <type 'float'> but got <type 'int'> for a", @@ -896,17 +896,17 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # Filter should deduce that it returns the same type that it takes. (self.p - | beam.Create('str', range(5)).with_output_types(int) - | beam.Filter('half', half) - | beam.Map('to bool', lambda x: bool(x)) + | 'str' >> beam.Create(range(5)).with_output_types(int) + | 'half' >> beam.Filter(half) + | 'to bool' >> beam.Map(lambda x: bool(x)) .with_input_types(int).with_output_types(bool)) def test_group_by_key_only_output_type_deduction(self): d = (self.p - | beam.Create('str', ['t', 'e', 's', 't']).with_output_types(str) - | (beam.Map('pair', lambda x: (x, ord(x))) + | 'str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | 'pair' >> (beam.Map(lambda x: (x, ord(x))) .with_output_types(typehints.KV[str, str])) - | beam.GroupByKeyOnly('O')) + | 'O' >> beam.GroupByKeyOnly()) # Output type should correctly be deduced. # GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]]. @@ -915,10 +915,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_group_by_key_output_type_deduction(self): d = (self.p - | beam.Create('str', range(20)).with_output_types(int) - | (beam.Map('pair negative', lambda x: (x % 5, -x)) + | 'str' >> beam.Create(range(20)).with_output_types(int) + | 'pair negative' >> (beam.Map(lambda x: (x % 5, -x)) .with_output_types(typehints.KV[int, int])) - | beam.GroupByKey('T')) + | 'T' >> beam.GroupByKey()) # Output type should correctly be deduced. # GBK should deduce that KV[A, B] is turned into KV[A, Iterable[B]]. @@ -929,8 +929,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # GBK will be passed raw int's here instead of some form of KV[A, B]. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('s', [1, 2, 3]).with_output_types(int) - | beam.GroupByKeyOnly('F')) + | 's' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'F' >> beam.GroupByKeyOnly()) self.assertEqual("Input type hint violation at F: " "expected Tuple[TypeVariable[K], TypeVariable[V]], " @@ -942,9 +942,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # aliased to Tuple[int, str]. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | (beam.Create('s', range(5)) + | 's' >> (beam.Create(range(5)) .with_output_types(typehints.Iterable[int])) - | beam.GroupByKey('T')) + | 'T' >> beam.GroupByKey()) self.assertEqual("Input type hint violation at T: " "expected Tuple[TypeVariable[K], TypeVariable[V]], " @@ -958,8 +958,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # information to the ParDo. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('nums', range(5)) - | beam.FlatMap('mod dup', lambda x: (x % 2, x))) + | 'nums' >> beam.Create(range(5)) + | 'mod dup' >> beam.FlatMap(lambda x: (x % 2, x))) self.assertEqual('Pipeline type checking is enabled, however no output ' 'type-hint was found for the PTransform Create(nums)', @@ -971,9 +971,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # information to GBK-only. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('nums', range(5)).with_output_types(int) - | beam.Map('mod dup', lambda x: (x % 2, x)) - | beam.GroupByKeyOnly('G')) + | 'nums' >> beam.Create(range(5)).with_output_types(int) + | 'mod dup' >> beam.Map(lambda x: (x % 2, x)) + | 'G' >> beam.GroupByKeyOnly()) self.assertEqual('Pipeline type checking is enabled, however no output ' 'type-hint was found for the PTransform ' @@ -986,8 +986,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # The pipeline below should raise a TypeError, however pipeline type # checking was disabled above. (self.p - | beam.Create('t', [1, 2, 3]).with_output_types(int) - | beam.Map('lower', lambda x: x.lower()) + | 't' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'lower' >> beam.Map(lambda x: x.lower()) .with_input_types(str).with_output_types(str)) def test_run_time_type_checking_enabled_type_violation(self): @@ -1002,8 +1002,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # Function above has been type-hinted to only accept an int. But in the # pipeline execution it'll be passed a string due to the output of Create. (self.p - | beam.Create('t', ['some_string']) - | beam.Map('to str', int_to_string)) + | 't' >> beam.Create(['some_string']) + | 'to str' >> beam.Map(int_to_string)) with self.assertRaises(typehints.TypeCheckError) as e: self.p.run() @@ -1026,10 +1026,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # Pipeline checking is off, but the above function should satisfy types at # run-time. result = (self.p - | beam.Create('t', ['t', 'e', 's', 't', 'i', 'n', 'g']) + | 't' >> beam.Create(['t', 'e', 's', 't', 'i', 'n', 'g']) .with_output_types(str) - | beam.Map('gen keys', group_with_upper_ord) - | beam.GroupByKey('O')) + | 'gen keys' >> beam.Map(group_with_upper_ord) + | 'O' >> beam.GroupByKey()) assert_that(result, equal_to([(1, ['g']), (3, ['s', 'i', 'n']), @@ -1048,9 +1048,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): return (a % 2, a) (self.p - | beam.Create('nums', range(5)).with_output_types(int) - | beam.Map('is even', is_even_as_key) - | beam.GroupByKey('parity')) + | 'nums' >> beam.Create(range(5)).with_output_types(int) + | 'is even' >> beam.Map(is_even_as_key) + | 'parity' >> beam.GroupByKey()) # Although all the types appear to be correct when checked at pipeline # construction. Runtime type-checking should detect the 'is_even_as_key' is @@ -1077,9 +1077,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): return (a % 2 == 0, a) result = (self.p - | beam.Create('nums', range(5)).with_output_types(int) - | beam.Map('is even', is_even_as_key) - | beam.GroupByKey('parity')) + | 'nums' >> beam.Create(range(5)).with_output_types(int) + | 'is even' >> beam.Map(is_even_as_key) + | 'parity' >> beam.GroupByKey()) assert_that(result, equal_to([(False, [1, 3]), (True, [0, 2, 4])])) self.p.run() @@ -1092,8 +1092,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # ParDo should receive an instance of type 'str', however an 'int' will be # passed instead. with self.assertRaises(typehints.TypeCheckError) as e: - (self.p | beam.Create('n', [1, 2, 3]) - | (beam.FlatMap('to int', lambda x: [int(x)]) + (self.p | 'n' >> beam.Create([1, 2, 3]) + | 'to int' >> (beam.FlatMap(lambda x: [int(x)]) .with_input_types(str).with_output_types(int)) ) self.p.run() @@ -1111,8 +1111,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('n', [(1, 3.0), (2, 4.9), (3, 9.5)]) - | (beam.FlatMap('add', lambda (x, y): [x + y]) + | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)]) + | 'add' >> (beam.FlatMap(lambda (x, y): [x + y]) .with_input_types(typehints.Tuple[int, int]).with_output_types(int)) ) self.p.run() @@ -1136,8 +1136,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): lambda x: [float(x)]).with_input_types(int).with_output_types( int).get_type_hints() with self.assertRaises(typehints.TypeCheckError) as e: - (self.p | beam.Create('n', [1, 2, 3]) - | (beam.FlatMap('to int', lambda x: [float(x)]) + (self.p | 'n' >> beam.Create([1, 2, 3]) + | 'to int' >> (beam.FlatMap(lambda x: [float(x)]) .with_input_types(int).with_output_types(int)) ) self.p.run() @@ -1159,8 +1159,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # of 'int' will be generated instead. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('n', [(1, 3.0), (2, 4.9), (3, 9.5)]) - | (beam.FlatMap('swap', lambda (x, y): [x + y]) + | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)]) + | 'swap' >> (beam.FlatMap(lambda (x, y): [x + y]) .with_input_types(typehints.Tuple[int, float]) .with_output_types(typehints.Tuple[float, int])) ) @@ -1183,7 +1183,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): return a + b with self.assertRaises(typehints.TypeCheckError) as e: - (self.p | beam.Create('t', [1, 2, 3, 4]) | beam.Map('add 1', add, 1.0)) + (self.p | 't' >> beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 1.0)) self.p.run() self.assertStartswith( @@ -1199,8 +1199,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('t', [1, 2, 3, 4]) - | (beam.Map('add 1', lambda x, one: x + one, 1.0) + | 't' >> beam.Create([1, 2, 3, 4]) + | 'add 1' >> (beam.Map(lambda x, one: x + one, 1.0) .with_input_types(int, int) .with_output_types(float))) self.p.run() @@ -1219,8 +1219,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): return sum(ints) d = (self.p - | beam.Create('t', [1, 2, 3]).with_output_types(int) - | beam.CombineGlobally('sum', sum_ints)) + | 't' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'sum' >> beam.CombineGlobally(sum_ints)) self.assertEqual(int, d.element_type) assert_that(d, equal_to([6])) @@ -1234,8 +1234,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('m', [1, 2, 3]).with_output_types(int) - | beam.CombineGlobally('add', bad_combine)) + | 'm' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'add' >> beam.CombineGlobally(bad_combine)) self.assertEqual( "All functions for a Combine PTransform must accept a " @@ -1255,9 +1255,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): return list(range(n+1)) d = (self.p - | beam.Create('t', [1, 2, 3]).with_output_types(int) - | beam.CombineGlobally('sum', sum_ints) - | beam.ParDo('range', range_from_zero)) + | 't' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'sum' >> beam.CombineGlobally(sum_ints) + | 'range' >> beam.ParDo(range_from_zero)) self.assertEqual(int, d.element_type) assert_that(d, equal_to([0, 1, 2, 3, 4, 5, 6])) @@ -1272,8 +1272,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): return reduce(operator.mul, ints, 1) d = (self.p - | beam.Create('k', [5, 5, 5, 5]).with_output_types(int) - | beam.CombineGlobally('mul', iter_mul)) + | 'k' >> beam.Create([5, 5, 5, 5]).with_output_types(int) + | 'mul' >> beam.CombineGlobally(iter_mul)) assert_that(d, equal_to([625])) self.p.run() @@ -1290,8 +1290,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('k', [5, 5, 5, 5]).with_output_types(int) - | beam.CombineGlobally('mul', iter_mul)) + | 'k' >> beam.Create([5, 5, 5, 5]).with_output_types(int) + | 'mul' >> beam.CombineGlobally(iter_mul)) self.p.run() self.assertStartswith( @@ -1305,8 +1305,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_combine_pipeline_type_check_using_methods(self): d = (self.p - | beam.Create('s', ['t', 'e', 's', 't']).with_output_types(str) - | (beam.CombineGlobally('concat', lambda s: ''.join(s)) + | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | 'concat' >> (beam.CombineGlobally(lambda s: ''.join(s)) .with_input_types(str).with_output_types(str))) def matcher(expected): @@ -1321,8 +1321,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | beam.Create('s', range(5)).with_output_types(int) - | (beam.CombineGlobally('sum', lambda s: sum(s)) + | 's' >> beam.Create(range(5)).with_output_types(int) + | 'sum' >> (beam.CombineGlobally(lambda s: sum(s)) .with_input_types(int).with_output_types(int))) assert_that(d, equal_to([10])) @@ -1331,8 +1331,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_combine_pipeline_type_check_violation_using_methods(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('e', range(3)).with_output_types(int) - | (beam.CombineGlobally('sort join', lambda s: ''.join(sorted(s))) + | 'e' >> beam.Create(range(3)).with_output_types(int) + | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s))) .with_input_types(str).with_output_types(str))) self.assertEqual("Input type hint violation at sort join: " @@ -1345,8 +1345,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('e', range(3)).with_output_types(int) - | (beam.CombineGlobally('sort join', lambda s: ''.join(sorted(s))) + | 'e' >> beam.Create(range(3)).with_output_types(int) + | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s))) .with_input_types(str).with_output_types(str))) self.p.run() @@ -1363,9 +1363,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('e', range(3)).with_output_types(int) - | beam.CombineGlobally('sort join', lambda s: ''.join(sorted(s))) - | beam.Map('f', lambda x: x + 1)) + | 'e' >> beam.Create(range(3)).with_output_types(int) + | 'sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s))) + | 'f' >> beam.Map(lambda x: x + 1)) self.assertEqual( 'Pipeline type checking is enabled, ' @@ -1375,8 +1375,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_mean_globally_pipeline_checking_satisfied(self): d = (self.p - | beam.Create('c', range(5)).with_output_types(int) - | combine.Mean.Globally('mean')) + | 'c' >> beam.Create(range(5)).with_output_types(int) + | 'mean' >> combine.Mean.Globally()) self.assertTrue(d.element_type is float) assert_that(d, equal_to([2.0])) @@ -1385,8 +1385,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_mean_globally_pipeline_checking_violated(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('c', ['test']).with_output_types(str) - | combine.Mean.Globally('mean')) + | 'c' >> beam.Create(['test']).with_output_types(str) + | 'mean' >> combine.Mean.Globally()) self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': " "requires Tuple[TypeVariable[K], " @@ -1398,8 +1398,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | beam.Create('c', range(5)).with_output_types(int) - | combine.Mean.Globally('mean')) + | 'c' >> beam.Create(range(5)).with_output_types(int) + | 'mean' >> combine.Mean.Globally()) self.assertTrue(d.element_type is float) assert_that(d, equal_to([2.0])) @@ -1411,8 +1411,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('c', ['t', 'e', 's', 't']).with_output_types(str) - | combine.Mean.Globally('mean')) + | 'c' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | 'mean' >> combine.Mean.Globally()) self.p.run() self.assertEqual("Runtime type violation detected for transform input " "when executing ParDoFlatMap(Combine): Tuple[Any, " @@ -1427,10 +1427,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_mean_per_key_pipeline_checking_satisfied(self): d = (self.p - | beam.Create('c', range(5)).with_output_types(int) - | (beam.Map('even group', lambda x: (not x % 2, x)) + | 'c' >> beam.Create(range(5)).with_output_types(int) + | 'even group' >> (beam.Map(lambda x: (not x % 2, x)) .with_output_types(typehints.KV[bool, int])) - | combine.Mean.PerKey('even mean')) + | 'even mean' >> combine.Mean.PerKey()) self.assertCompatible(typehints.KV[bool, float], d.element_type) assert_that(d, equal_to([(False, 2.0), (True, 2.0)])) @@ -1439,10 +1439,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_mean_per_key_pipeline_checking_violated(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('e', map(str, range(5))).with_output_types(str) - | (beam.Map('upper pair', lambda x: (x.upper(), x)) + | 'e' >> beam.Create(map(str, range(5))).with_output_types(str) + | 'upper pair' >> (beam.Map(lambda x: (x.upper(), x)) .with_output_types(typehints.KV[str, str])) - | combine.Mean.PerKey('even mean')) + | 'even mean' >> combine.Mean.PerKey()) self.p.run() self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': " @@ -1455,10 +1455,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | beam.Create('c', range(5)).with_output_types(int) - | (beam.Map('odd group', lambda x: (bool(x % 2), x)) + | 'c' >> beam.Create(range(5)).with_output_types(int) + | 'odd group' >> (beam.Map(lambda x: (bool(x % 2), x)) .with_output_types(typehints.KV[bool, int])) - | combine.Mean.PerKey('odd mean')) + | 'odd mean' >> combine.Mean.PerKey()) self.assertCompatible(typehints.KV[bool, float], d.element_type) assert_that(d, equal_to([(False, 2.0), (True, 2.0)])) @@ -1470,10 +1470,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('c', range(5)).with_output_types(int) - | (beam.Map('odd group', lambda x: (x, str(bool(x % 2)))) + | 'c' >> beam.Create(range(5)).with_output_types(int) + | 'odd group' >> (beam.Map(lambda x: (x, str(bool(x % 2)))) .with_output_types(typehints.KV[int, str])) - | combine.Mean.PerKey('odd mean')) + | 'odd mean' >> combine.Mean.PerKey()) self.p.run() self.assertStartswith( @@ -1494,8 +1494,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_count_globally_pipeline_type_checking_satisfied(self): d = (self.p - | beam.Create('p', range(5)).with_output_types(int) - | combine.Count.Globally('count int')) + | 'p' >> beam.Create(range(5)).with_output_types(int) + | 'count int' >> combine.Count.Globally()) self.assertTrue(d.element_type is int) assert_that(d, equal_to([5])) @@ -1505,8 +1505,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | beam.Create('p', range(5)).with_output_types(int) - | combine.Count.Globally('count int')) + | 'p' >> beam.Create(range(5)).with_output_types(int) + | 'count int' >> combine.Count.Globally()) self.assertTrue(d.element_type is int) assert_that(d, equal_to([5])) @@ -1514,10 +1514,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_count_perkey_pipeline_type_checking_satisfied(self): d = (self.p - | beam.Create('p', range(5)).with_output_types(int) - | (beam.Map('even group', lambda x: (not x % 2, x)) + | 'p' >> beam.Create(range(5)).with_output_types(int) + | 'even group' >> (beam.Map(lambda x: (not x % 2, x)) .with_output_types(typehints.KV[bool, int])) - | combine.Count.PerKey('count int')) + | 'count int' >> combine.Count.PerKey()) self.assertCompatible(typehints.KV[bool, int], d.element_type) assert_that(d, equal_to([(False, 2), (True, 3)])) @@ -1526,8 +1526,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_count_perkey_pipeline_type_checking_violated(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('p', range(5)).with_output_types(int) - | combine.Count.PerKey('count int')) + | 'p' >> beam.Create(range(5)).with_output_types(int) + | 'count int' >> combine.Count.PerKey()) self.assertEqual("Input type hint violation at GroupByKey: " "expected Tuple[TypeVariable[K], TypeVariable[V]], " @@ -1538,10 +1538,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | beam.Create('c', ['t', 'e', 's', 't']).with_output_types(str) - | beam.Map('dup key', lambda x: (x, x)) + | 'c' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | 'dup key' >> beam.Map(lambda x: (x, x)) .with_output_types(typehints.KV[str, str]) - | combine.Count.PerKey('count dups')) + | 'count dups' >> combine.Count.PerKey()) self.assertCompatible(typehints.KV[str, int], d.element_type) assert_that(d, equal_to([('e', 1), ('s', 1), ('t', 2)])) @@ -1549,8 +1549,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_count_perelement_pipeline_type_checking_satisfied(self): d = (self.p - | beam.Create('w', [1, 1, 2, 3]).with_output_types(int) - | combine.Count.PerElement('count elems')) + | 'w' >> beam.Create([1, 1, 2, 3]).with_output_types(int) + | 'count elems' >> combine.Count.PerElement()) self.assertCompatible(typehints.KV[int, int], d.element_type) assert_that(d, equal_to([(1, 2), (2, 1), (3, 1)])) @@ -1561,8 +1561,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('f', [1, 1, 2, 3]) - | combine.Count.PerElement('count elems')) + | 'f' >> beam.Create([1, 1, 2, 3]) + | 'count elems' >> combine.Count.PerElement()) self.assertEqual('Pipeline type checking is enabled, however no output ' 'type-hint was found for the PTransform ' @@ -1573,9 +1573,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | beam.Create('w', [True, True, False, True, True]) + | 'w' >> beam.Create([True, True, False, True, True]) .with_output_types(bool) - | combine.Count.PerElement('count elems')) + | 'count elems' >> combine.Count.PerElement()) self.assertCompatible(typehints.KV[bool, int], d.element_type) assert_that(d, equal_to([(False, 1), (True, 4)])) @@ -1583,8 +1583,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_top_of_pipeline_checking_satisfied(self): d = (self.p - | beam.Create('n', range(5, 11)).with_output_types(int) - | combine.Top.Of('top 3', 3, lambda x, y: x < y)) + | 'n' >> beam.Create(range(5, 11)).with_output_types(int) + | 'top 3' >> combine.Top.Of(3, lambda x, y: x < y)) self.assertCompatible(typehints.Iterable[int], d.element_type) @@ -1595,8 +1595,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | beam.Create('n', list('testing')).with_output_types(str) - | combine.Top.Of('acii top', 3, lambda x, y: x < y)) + | 'n' >> beam.Create(list('testing')).with_output_types(str) + | 'acii top' >> combine.Top.Of(3, lambda x, y: x < y)) self.assertCompatible(typehints.Iterable[str], d.element_type) assert_that(d, equal_to([['t', 't', 's']])) @@ -1605,9 +1605,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_per_key_pipeline_checking_violated(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('n', range(100)).with_output_types(int) - | beam.Map('num + 1', lambda x: x + 1).with_output_types(int) - | combine.Top.PerKey('top mod', 1, lambda a, b: a < b)) + | 'n' >> beam.Create(range(100)).with_output_types(int) + | 'num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int) + | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b)) self.assertEqual("Input type hint violation at GroupByKey: " "expected Tuple[TypeVariable[K], TypeVariable[V]], " @@ -1616,10 +1616,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_per_key_pipeline_checking_satisfied(self): d = (self.p - | beam.Create('n', range(100)).with_output_types(int) - | (beam.Map('group mod 3', lambda x: (x % 3, x)) + | 'n' >> beam.Create(range(100)).with_output_types(int) + | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x)) .with_output_types(typehints.KV[int, int])) - | combine.Top.PerKey('top mod', 1, lambda a, b: a < b)) + | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b)) self.assertCompatible(typehints.Tuple[int, typehints.Iterable[int]], d.element_type) @@ -1630,10 +1630,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | beam.Create('n', range(21)) - | (beam.Map('group mod 3', lambda x: (x % 3, x)) + | 'n' >> beam.Create(range(21)) + | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x)) .with_output_types(typehints.KV[int, int])) - | combine.Top.PerKey('top mod', 1, lambda a, b: a < b)) + | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b)) self.assertCompatible(typehints.KV[int, typehints.Iterable[int]], d.element_type) @@ -1642,8 +1642,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_sample_globally_pipeline_satisfied(self): d = (self.p - | beam.Create('m', [2, 2, 3, 3]).with_output_types(int) - | combine.Sample.FixedSizeGlobally('sample', 3)) + | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int) + | 'sample' >> combine.Sample.FixedSizeGlobally(3)) self.assertCompatible(typehints.Iterable[int], d.element_type) @@ -1658,8 +1658,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | beam.Create('m', [2, 2, 3, 3]).with_output_types(int) - | combine.Sample.FixedSizeGlobally('sample', 2)) + | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int) + | 'sample' >> combine.Sample.FixedSizeGlobally(2)) self.assertCompatible(typehints.Iterable[int], d.element_type) @@ -1672,9 +1672,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_sample_per_key_pipeline_satisfied(self): d = (self.p - | (beam.Create('m', [(1, 2), (1, 2), (2, 3), (2, 3)]) + | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)]) .with_output_types(typehints.KV[int, int])) - | combine.Sample.FixedSizePerKey('sample', 2)) + | 'sample' >> combine.Sample.FixedSizePerKey(2)) self.assertCompatible(typehints.KV[int, typehints.Iterable[int]], d.element_type) @@ -1691,9 +1691,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | (beam.Create('m', [(1, 2), (1, 2), (2, 3), (2, 3)]) + | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)]) .with_output_types(typehints.KV[int, int])) - | combine.Sample.FixedSizePerKey('sample', 1)) + | 'sample' >> combine.Sample.FixedSizePerKey(1)) self.assertCompatible(typehints.KV[int, typehints.Iterable[int]], d.element_type) @@ -1708,8 +1708,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_to_list_pipeline_check_satisfied(self): d = (self.p - | beam.Create('c', (1, 2, 3, 4)).with_output_types(int) - | combine.ToList('to list')) + | 'c' >> beam.Create((1, 2, 3, 4)).with_output_types(int) + | 'to list' >> combine.ToList()) self.assertCompatible(typehints.List[int], d.element_type) @@ -1724,8 +1724,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | beam.Create('c', list('test')).with_output_types(str) - | combine.ToList('to list')) + | 'c' >> beam.Create(list('test')).with_output_types(str) + | 'to list' >> combine.ToList()) self.assertCompatible(typehints.List[str], d.element_type) @@ -1739,8 +1739,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_to_dict_pipeline_check_violated(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create('d', [1, 2, 3, 4]).with_output_types(int) - | combine.ToDict('to dict')) + | 'd' >> beam.Create([1, 2, 3, 4]).with_output_types(int) + | 'to dict' >> combine.ToDict()) self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': " "requires Tuple[TypeVariable[K], " @@ -1753,7 +1753,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): | beam.Create( 'd', [(1, 2), (3, 4)]).with_output_types(typehints.Tuple[int, int]) - | combine.ToDict('to dict')) + | 'to dict' >> combine.ToDict()) self.assertCompatible(typehints.Dict[int, int], d.element_type) assert_that(d, equal_to([{1: 2, 3: 4}])) @@ -1763,9 +1763,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | (beam.Create('d', [('1', 2), ('3', 4)]) + | 'd' >> (beam.Create([('1', 2), ('3', 4)]) .with_output_types(typehints.Tuple[str, int])) - | combine.ToDict('to dict')) + | 'to dict' >> combine.ToDict()) self.assertCompatible(typehints.Dict[str, int], d.element_type) assert_that(d, equal_to([{'1': 2, '3': 4}])) @@ -1776,8 +1776,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(TypeError) as e: (self.p - | beam.Create('t', [1, 2, 3]).with_output_types(int) - | beam.Map('len', lambda x: len(x)).with_output_types(int)) + | 't' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'len' >> beam.Map(lambda x: len(x)).with_output_types(int)) self.p.run() # Our special type-checking related TypeError shouldn't have been raised. @@ -1799,8 +1799,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): beam.core.GroupByKeyOnly().infer_output_type(typehints.KV[str, int])) def test_pipeline_inference(self): - created = self.p | beam.Create('c', ['a', 'b', 'c']) - mapped = created | beam.Map('pair with 1', lambda x: (x, 1)) + created = self.p | 'c' >> beam.Create(['a', 'b', 'c']) + mapped = created | 'pair with 1' >> beam.Map(lambda x: (x, 1)) grouped = mapped | beam.GroupByKey() self.assertEqual(str, created.element_type) self.assertEqual(typehints.KV[str, int], mapped.element_type) @@ -1810,8 +1810,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_inferred_bad_kv_type(self): with self.assertRaises(typehints.TypeCheckError) as e: _ = (self.p - | beam.Create('t', ['a', 'b', 'c']) - | beam.Map('ungroupable', lambda x: (x, 0, 1.0)) + | 't' >> beam.Create(['a', 'b', 'c']) + | 'ungroupable' >> beam.Map(lambda x: (x, 0, 1.0)) | beam.GroupByKey()) self.assertEqual('Input type hint violation at GroupByKey: ' @@ -1821,11 +1821,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_type_inference_command_line_flag_toggle(self): self.p.options.view_as(TypeOptions).pipeline_type_check = False - x = self.p | beam.Create('t', [1, 2, 3, 4]) + x = self.p | 't' >> beam.Create([1, 2, 3, 4]) self.assertIsNone(x.element_type) self.p.options.view_as(TypeOptions).pipeline_type_check = True - x = self.p | beam.Create('m', [1, 2, 3, 4]) + x = self.p | 'm' >> beam.Create([1, 2, 3, 4]) self.assertEqual(int, x.element_type) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index b7a121d..bbb7787 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -160,9 +160,9 @@ def KvSwap(label='KvSwap'): # pylint: disable=invalid-name def RemoveDuplicates(pcoll): # pylint: disable=invalid-name """Produces a PCollection containing the unique elements of a PCollection.""" return (pcoll - | Map('ToPairs', lambda v: (v, None)) - | CombinePerKey('Group', lambda vs: None) - | Keys('RemoveDuplicates')) + | 'ToPairs' >> Map(lambda v: (v, None)) + | 'Group' >> CombinePerKey(lambda vs: None) + | 'RemoveDuplicates' >> Keys()) class DataflowAssertException(Exception): @@ -220,7 +220,7 @@ def assert_that(actual, matcher, label='assert_that'): class AssertThat(PTransform): def apply(self, pipeline): - return pipeline | Create('singleton', [None]) | Map(match, AllOf(actual)) + return pipeline | 'singleton' >> Create([None]) | Map(match, AllOf(actual)) def default_label(self): return label http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/window_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 186fcd4..012dde4 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -127,14 +127,14 @@ class WindowTest(unittest.TestCase): self.assertEqual([IntervalWindow(2, 25)], merge(2, 15, 10)) def timestamped_key_values(self, pipeline, key, *timestamps): - return (pipeline | Create('start', timestamps) + return (pipeline | 'start' >> Create(timestamps) | Map(lambda x: WindowedValue((key, x), x, []))) def test_sliding_windows(self): p = Pipeline('DirectPipelineRunner') pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3) result = (pcoll - | WindowInto('w', SlidingWindows(period=2, size=4)) + | 'w' >> WindowInto(SlidingWindows(period=2, size=4)) | GroupByKey() | reify_windows) expected = [('key @ [-2.0, 2.0)', [1]), @@ -147,7 +147,7 @@ class WindowTest(unittest.TestCase): p = Pipeline('DirectPipelineRunner') pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27) result = (pcoll - | WindowInto('w', Sessions(10)) + | 'w' >> WindowInto(Sessions(10)) | GroupByKey() | sort_values | reify_windows) @@ -159,9 +159,9 @@ class WindowTest(unittest.TestCase): def test_timestamped_value(self): p = Pipeline('DirectPipelineRunner') result = (p - | Create('start', [(k, k) for k in range(10)]) + | 'start' >> Create([(k, k) for k in range(10)]) | Map(lambda (x, t): TimestampedValue(x, t)) - | WindowInto('w', FixedWindows(5)) + | 'w' >> WindowInto(FixedWindows(5)) | Map(lambda v: ('key', v)) | GroupByKey()) assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]), @@ -172,13 +172,13 @@ class WindowTest(unittest.TestCase): p = Pipeline('DirectPipelineRunner') result = (p # Create some initial test values. - | Create('start', [(k, k) for k in range(10)]) + | 'start' >> Create([(k, k) for k in range(10)]) # The purpose of the WindowInto transform is to establish a # FixedWindows windowing function for the PCollection. # It does not bucket elements into windows since the timestamps # from Create are not spaced 5 ms apart and very likely they all # fall into the same window. - | WindowInto('w', FixedWindows(5)) + | 'w' >> WindowInto(FixedWindows(5)) # Generate timestamped values using the values as timestamps. # Now there are values 5 ms apart and since Map propagates the # windowing function from input to output the output PCollection http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/write_ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py index 51163bc..af3668c 100644 --- a/sdks/python/apache_beam/transforms/write_ptransform_test.py +++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py @@ -100,7 +100,7 @@ class WriteTest(unittest.TestCase): write_to_test_sink = WriteToTestSink(return_init_result, return_write_results) p = Pipeline(options=PipelineOptions([])) - result = p | beam.Create('start', data) | write_to_test_sink + result = p | 'start' >> beam.Create(data) | write_to_test_sink assert_that(result, is_empty()) p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/typehints/typed_pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index b25f231..4e1ab68 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -80,7 +80,7 @@ class MainInputTest(unittest.TestCase): ['a', 'b', 'c'] | beam.ParDo(MyDoFn()) with self.assertRaises(typehints.TypeCheckError): - [1, 2, 3] | (beam.ParDo(MyDoFn()) | beam.ParDo('again', MyDoFn())) + [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn())) def test_typed_dofn_instance(self): class MyDoFn(beam.DoFn): @@ -95,7 +95,7 @@ class MainInputTest(unittest.TestCase): ['a', 'b', 'c'] | beam.ParDo(my_do_fn) with self.assertRaises(typehints.TypeCheckError): - [1, 2, 3] | (beam.ParDo(my_do_fn) | beam.ParDo('again', my_do_fn)) + [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn)) class SideInputTest(unittest.TestCase): @@ -170,14 +170,14 @@ class SideInputTest(unittest.TestCase): return s * times p = beam.Pipeline(options=PipelineOptions([])) main_input = p | beam.Create(['a', 'bb', 'c']) - side_input = p | beam.Create('side', [3]) + side_input = p | 'side' >> beam.Create([3]) result = main_input | beam.Map(repeat, pvalue.AsSingleton(side_input)) assert_that(result, equal_to(['aaa', 'bbbbbb', 'ccc'])) p.run() - bad_side_input = p | beam.Create('bad_side', ['z']) + bad_side_input = p | 'bad_side' >> beam.Create(['z']) with self.assertRaises(typehints.TypeCheckError): - main_input | beam.Map('again', repeat, pvalue.AsSingleton(bad_side_input)) + main_input | 'again' >> beam.Map(repeat, pvalue.AsSingleton(bad_side_input)) def test_deferred_side_input_iterable(self): @typehints.with_input_types(str, typehints.Iterable[str]) @@ -185,14 +185,14 @@ class SideInputTest(unittest.TestCase): return glue.join(sorted(items)) p = beam.Pipeline(options=PipelineOptions([])) main_input = p | beam.Create(['a', 'bb', 'c']) - side_input = p | beam.Create('side', ['x', 'y', 'z']) + side_input = p | 'side' >> beam.Create(['x', 'y', 'z']) result = main_input | beam.Map(concat, pvalue.AsIter(side_input)) assert_that(result, equal_to(['xayaz', 'xbbybbz', 'xcycz'])) p.run() - bad_side_input = p | beam.Create('bad_side', [1, 2, 3]) + bad_side_input = p | 'bad_side' >> beam.Create([1, 2, 3]) with self.assertRaises(typehints.TypeCheckError): - main_input | beam.Map('fail', concat, pvalue.AsIter(bad_side_input)) + main_input | 'fail' >> beam.Map(concat, pvalue.AsIter(bad_side_input)) class CustomTransformTest(unittest.TestCase):