Lint fixes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b15d35ca Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b15d35ca Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b15d35ca Branch: refs/heads/python-sdk Commit: b15d35ca6e585e75153e05d96403336889cc6894 Parents: 2a59a12 Author: Robert Bradshaw <rober...@google.com> Authored: Fri Jul 22 18:35:22 2016 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Sat Jul 23 16:43:46 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/dataflow_test.py | 7 +- .../complete/juliaset/juliaset/juliaset.py | 9 +- .../apache_beam/examples/complete/tfidf_test.py | 2 +- .../examples/cookbook/bigquery_side_input.py | 14 +- .../cookbook/bigquery_side_input_test.py | 4 +- .../examples/cookbook/bigquery_tornadoes.py | 6 +- .../apache_beam/examples/cookbook/bigshuffle.py | 5 +- .../apache_beam/examples/cookbook/filters.py | 2 +- .../apache_beam/examples/snippets/snippets.py | 2 +- .../examples/snippets/snippets_test.py | 8 +- sdks/python/apache_beam/examples/wordcount.py | 2 +- .../apache_beam/examples/wordcount_debugging.py | 2 +- .../apache_beam/examples/wordcount_minimal.py | 2 +- sdks/python/apache_beam/io/bigquery.py | 4 +- sdks/python/apache_beam/pipeline_test.py | 4 +- .../apache_beam/transforms/combiners_test.py | 4 +- sdks/python/apache_beam/transforms/core.py | 7 +- .../apache_beam/transforms/ptransform_test.py | 161 ++++++++++--------- sdks/python/apache_beam/transforms/util.py | 3 +- .../typehints/typed_pipeline_test.py | 2 +- 20 files changed, 127 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/dataflow_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py index bf66851..cc3a526 100644 --- a/sdks/python/apache_beam/dataflow_test.py +++ b/sdks/python/apache_beam/dataflow_test.py @@ -114,8 +114,8 @@ class DataflowTest(unittest.TestCase): words = pipeline | 'SomeWords' >> Create(words_list) prefix = 'zyx' suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in - result = words | 'DecorateWordsDoFn' >> ParDo(SomeDoFn(), prefix, - suffix=AsSingleton(suffix)) + result = words | 'DecorateWordsDoFn' >> ParDo( + SomeDoFn(), prefix, suffix=AsSingleton(suffix)) assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list])) pipeline.run() @@ -179,8 +179,7 @@ class DataflowTest(unittest.TestCase): pipeline = Pipeline('DirectPipelineRunner') pcol = pipeline | 'start' >> Create([1, 2]) side = pipeline | 'side' >> Create([]) # 0 values in side input. - result = ( - pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side, 10))) + result = pcol | FlatMap(lambda x, s: [x * s], AsSingleton(side, 10)) assert_that(result, equal_to([10, 20])) pipeline.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py index 56696c3..1445fbe 100644 --- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py +++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py @@ -105,11 +105,12 @@ def run(argv=None): # pylint: disable=missing-docstring # Group each coordinate triplet by its x value, then write the coordinates to # the output file with an x-coordinate grouping per line. # pylint: disable=expression-not-assigned - (coordinates | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i))) - | 'x coord' >> beam.GroupByKey() | beam.Map( - 'format', + (coordinates + | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i))) + | 'x coord' >> beam.GroupByKey() + | 'format' >> beam.Map( lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords)) - | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output))) + | beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output))) # pylint: enable=expression-not-assigned p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/complete/tfidf_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py index ee7e534..f30b832 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf_test.py +++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py @@ -56,7 +56,7 @@ class TfIdfTest(unittest.TestCase): result = ( uri_to_line | tfidf.TfIdf() - | 'flatten' >> beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf))) + | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf))) beam.assert_that(result, beam.equal_to(EXPECTED_RESULTS)) # Run the pipeline. Note that the assert_that above adds to the pipeline # a check that the result PCollection contains expected values. To actually http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py index 1db4a1e..2099e48 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py @@ -101,11 +101,12 @@ def run(argv=None): ignore_corpus = known_args.ignore_corpus ignore_word = known_args.ignore_word - pcoll_corpus = p | beam.Read('read corpus', - beam.io.BigQuerySource(query=query_corpus)) - pcoll_word = p | beam.Read('read words', - beam.io.BigQuerySource(query=query_word)) - pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create([ignore_corpus]) + pcoll_corpus = p | 'read corpus' >> beam.io.Read( + beam.io.BigQuerySource(query=query_corpus)) + pcoll_word = p | 'read_words' >> beam.Read( + beam.io.BigQuerySource(query=query_word)) + pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create( + [ignore_corpus]) pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word]) pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids) @@ -113,8 +114,7 @@ def run(argv=None): pcoll_ignore_corpus, pcoll_ignore_word) # pylint:disable=expression-not-assigned - pcoll_groups | beam.io.Write('WriteToText', - beam.io.TextFileSink(known_args.output)) + pcoll_groups | beam.io.Write(beam.io.TextFileSink(known_args.output)) p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py index 215aafa..e2b20f3 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py @@ -35,8 +35,8 @@ class BigQuerySideInputTest(unittest.TestCase): {'f': 'corpus2'}, {'f': 'corpus3'}]) words_pcoll = p | 'create_words' >> beam.Create([{'f': 'word1'}, - {'f': 'word2'}, - {'f': 'word3'}]) + {'f': 'word2'}, + {'f': 'word3'}]) ignore_corpus_pcoll = p | 'create_ignore_corpus' >> beam.Create(['corpus1']) ignore_word_pcoll = p | 'create_ignore_word' >> beam.Create(['word1']) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py index cdaee36..6e1326c 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py @@ -53,11 +53,11 @@ def count_tornadoes(input_data): """ return (input_data - | beam.FlatMap( - 'months with tornadoes', + | 'months with tornatoes' >> beam.FlatMap( lambda row: [(int(row['month']), 1)] if row['tornado'] else []) | 'monthly count' >> beam.CombinePerKey(sum) - | 'format' >> beam.Map(lambda (k, v): {'month': k, 'tornado_count': v})) + | 'format' >> beam.Map( + lambda (k, v): {'month': k, 'tornado_count': v})) def run(argv=None): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/bigshuffle.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py index c29a038..f7070dc 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py +++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py @@ -59,8 +59,9 @@ def run(argv=None): # Count the occurrences of each word. output = (lines - | 'split' >> beam.Map(lambda x: (x[:10], x[10:99]) - ).with_output_types(beam.typehints.KV[str, str]) + | 'split' >> beam.Map( + lambda x: (x[:10], x[10:99])) + .with_output_types(beam.typehints.KV[str, str]) | 'group' >> beam.GroupByKey() | beam.FlatMap( 'format', http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/cookbook/filters.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py index b19b566..efd0ba7 100644 --- a/sdks/python/apache_beam/examples/cookbook/filters.py +++ b/sdks/python/apache_beam/examples/cookbook/filters.py @@ -88,7 +88,7 @@ def run(argv=None): p = beam.Pipeline(argv=pipeline_args) - input_data = p | 'input' >> beam.Read(beam.io.BigQuerySource(known_args.input)) + input_data = p | beam.Read(beam.io.BigQuerySource(known_args.input)) # pylint: disable=expression-not-assigned (filter_cold_days(input_data, known_args.month_filter) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 9d1df82..9f3d6e1 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -307,7 +307,7 @@ def pipeline_options_command_line(argv): p = beam.Pipeline(argv=pipeline_args) lines = p | beam.io.Read('ReadFromText', beam.io.TextFileSource(known_args.input)) - lines | 'WriteToText' >> beam.io.Write(beam.io.TextFileSink(known_args.output)) + lines | beam.io.Write(beam.io.TextFileSink(known_args.output)) # [END pipeline_options_command_line] p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 9eba46a..edc0a17 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -101,6 +101,7 @@ class ParDoTest(unittest.TestCase): self.assertEqual({'A', 'C'}, set(all_capitals)) def test_pardo_with_label(self): + # pylint: disable=line-too-long words = ['aa', 'bbc', 'defg'] # [START model_pardo_with_label] result = words | 'CountUniqueLetters' >> beam.Map(lambda word: len(set(word))) @@ -127,10 +128,9 @@ class ParDoTest(unittest.TestCase): small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3) # A single deferred side input. - larger_than_average = (words - | 'large' >> beam.FlatMap(filter_using_length, - lower_bound=pvalue.AsSingleton( - avg_word_len))) + larger_than_average = (words | 'large' >> beam.FlatMap( + filter_using_length, + lower_bound=pvalue.AsSingleton(avg_word_len))) # Mix and match. small_but_nontrivial = words | beam.FlatMap(filter_using_length, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index 4744352..096e508 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -82,7 +82,7 @@ def run(argv=None): # Count the occurrences of each word. counts = (lines | 'split' >> (beam.ParDo(WordExtractingDoFn()) - .with_output_types(unicode)) + .with_output_types(unicode)) | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | 'group' >> beam.GroupByKey() | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/wordcount_debugging.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index e008b48..473a486 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -98,7 +98,7 @@ class CountWords(beam.PTransform): def apply(self, pcoll): return (pcoll | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) - .with_output_types(unicode)) + .with_output_types(unicode)) | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | 'group' >> beam.GroupByKey() | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/wordcount_minimal.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py index ce5b644..4073f7b 100644 --- a/sdks/python/apache_beam/examples/wordcount_minimal.py +++ b/sdks/python/apache_beam/examples/wordcount_minimal.py @@ -98,7 +98,7 @@ def run(argv=None): # Count the occurrences of each word. counts = (lines | 'split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) - .with_output_types(unicode)) + .with_output_types(unicode)) | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | 'group' >> beam.GroupByKey() | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/io/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index f789312..50c2eaf 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -45,8 +45,8 @@ Map transform will get on each call *one* row of the main table and *all* rows of the side table. The execution framework may use some caching techniques to share the side inputs between calls in order to avoid excessive reading:: - main_table = pipeline | 'very_big_table' >> beam.io.Read(beam.io.BigQuerySource() - side_table = pipeline | 'not_big_table' >> beam.io.Read(beam.io.BigQuerySource() + main_table = pipeline | 'very_big' >> beam.io.Read(beam.io.BigQuerySource() + side_table = pipeline | 'not_big' >> beam.io.Read(beam.io.BigQuerySource() results = ( main_table | beam.Map('process data', http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 327f26c..8a0d246 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -217,8 +217,8 @@ class PipelineTest(unittest.TestCase): dupes = ( biglist | 'oom:addone' >> Map(lambda x: (x, 1)) - | 'oom:dupes' >> FlatMap(create_dupes, - AsIter(biglist)).with_outputs('side', main='main')) + | 'oom:dupes' >> FlatMap( + create_dupes, AsIter(biglist)).with_outputs('side', main='main')) result = ( (dupes.side, dupes.main, dupes.side) | 'oom:flatten' >> Flatten() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/combiners_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index c970382..bfe168d 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -219,8 +219,8 @@ class CombineTest(unittest.TestCase): return main | Map(lambda _, s: s, side) p = Pipeline('DirectPipelineRunner') - result1 = p | 'label1' >> Create([]) | 'L1' >> CombineWithSideInput() - result2 = p | 'label2' >> Create([1, 2, 3, 4]) | 'L2' >> CombineWithSideInput() + result1 = p | 'i1' >> Create([]) | 'c1' >> CombineWithSideInput() + result2 = p | 'i2' >> Create([1, 2, 3, 4]) | 'c2' >> CombineWithSideInput() assert_that(result1, equal_to([0]), label='r1') assert_that(result2, equal_to([10]), label='r2') p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 44a6d29..5e6aafc 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -814,9 +814,9 @@ class CombineGlobally(PTransform): return transform combined = (pcoll - | 'KeyWithVoid' >> add_input_types(Map(lambda v: (None, v)) - .with_output_types( - KV[None, pcoll.element_type])) + | 'KeyWithVoid' >> add_input_types( + Map(lambda v: (None, v)).with_output_types( + KV[None, pcoll.element_type])) | CombinePerKey( 'CombinePerKey', self.fn, *self.args, **self.kwargs) | 'UnKey' >> Map(lambda (k, v): v)) @@ -1044,6 +1044,7 @@ class GroupByKey(PTransform): KV[key_type, Iterable[typehints.WindowedValue[value_type]]]) gbk_output_type = KV[key_type, Iterable[value_type]] + # pylint: disable=bad-continuation return (pcoll | 'reify_windows' >> (ParDo(self.ReifyWindows()) .with_output_types(reify_output_type)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 3a71ec3..992f944 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -288,7 +288,7 @@ class PTransformTest(unittest.TestCase): vals_2 = [2, 4, 6, 8, 10, 12, 14] pipeline = Pipeline('DirectPipelineRunner') pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] + - [('b', x) for x in vals_2])) + [('b', x) for x in vals_2])) result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn()) assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)), ('b', sum(vals_2) / len(vals_2))])) @@ -299,7 +299,7 @@ class PTransformTest(unittest.TestCase): vals_2 = [2, 4, 6, 8, 10, 12, 14] pipeline = Pipeline('DirectPipelineRunner') pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] + - [('b', x) for x in vals_2])) + [('b', x) for x in vals_2])) result = pcoll | beam.CombinePerKey(sum) assert_that(result, equal_to([('a', sum(vals_1)), ('b', sum(vals_2))])) pipeline.run() @@ -309,7 +309,7 @@ class PTransformTest(unittest.TestCase): vals_2 = [2, 4, 6, 8, 10, 12, 14] pipeline = Pipeline('DirectPipelineRunner') pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] + - [('b', x) for x in vals_2])) + [('b', x) for x in vals_2])) divisor = pipeline | 'divisor' >> beam.Create([2]) result = pcoll | beam.CombinePerKey( lambda vals, d: max(v for v in vals if v % d == 0), @@ -513,13 +513,14 @@ class PTransformTest(unittest.TestCase): def test_apply_to_list(self): self.assertItemsEqual( [1, 2, 3], [0, 1, 2] | 'add_one' >> beam.Map(lambda x: x + 1)) - self.assertItemsEqual([1], [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 2)) + self.assertItemsEqual([1], + [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 2)) self.assertItemsEqual([1, 2, 100, 3], - ([1, 2, 3], [100]) | 'flat' >> beam.Flatten()) + ([1, 2, 3], [100]) | beam.Flatten()) join_input = ([('k', 'a')], [('k', 'b'), ('k', 'c')]) self.assertItemsEqual([('k', (['a'], ['b', 'c']))], - join_input | 'join' >> beam.CoGroupByKey()) + join_input | beam.CoGroupByKey()) def test_multi_input_ptransform(self): class DisjointUnion(PTransform): @@ -776,9 +777,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) - | 'score' >> (beam.FlatMap(lambda x: [1] if x == 't' else [2]) + | ('score' >> beam.FlatMap(lambda x: [1] if x == 't' else [2]) .with_input_types(str).with_output_types(int)) - | 'upper' >> (beam.FlatMap(lambda x: [x.upper()]) + | ('upper' >> beam.FlatMap(lambda x: [x.upper()]) .with_input_types(str).with_output_types(str))) self.assertEqual("Type hint violation for 'upper': " @@ -866,7 +867,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_filter_type_checks_using_type_hints_method(self): # No error should be raised if this type-checks properly. d = (self.p - | 'strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str) + | beam.Create(['1', '2', '3', '4', '5']).with_output_types(str) | 'to int' >> beam.Map(lambda x: int(x)) .with_input_types(str).with_output_types(int) | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int)) @@ -904,9 +905,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_group_by_key_only_output_type_deduction(self): d = (self.p | 'str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) - | 'pair' >> (beam.Map(lambda x: (x, ord(x))) + | ('pair' >> beam.Map(lambda x: (x, ord(x))) .with_output_types(typehints.KV[str, str])) - | 'O' >> beam.GroupByKeyOnly()) + | beam.GroupByKeyOnly()) # Output type should correctly be deduced. # GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]]. @@ -916,9 +917,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_group_by_key_output_type_deduction(self): d = (self.p | 'str' >> beam.Create(range(20)).with_output_types(int) - | 'pair negative' >> (beam.Map(lambda x: (x % 5, -x)) + | ('pair negative' >> beam.Map(lambda x: (x % 5, -x)) .with_output_types(typehints.KV[int, int])) - | 'T' >> beam.GroupByKey()) + | beam.GroupByKey()) # Output type should correctly be deduced. # GBK should deduce that KV[A, B] is turned into KV[A, Iterable[B]]. @@ -929,8 +930,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # GBK will be passed raw int's here instead of some form of KV[A, B]. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 's' >> beam.Create([1, 2, 3]).with_output_types(int) - | 'F' >> beam.GroupByKeyOnly()) + | beam.Create([1, 2, 3]).with_output_types(int) + | beam.GroupByKeyOnly()) self.assertEqual("Input type hint violation at F: " "expected Tuple[TypeVariable[K], TypeVariable[V]], " @@ -942,9 +943,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # aliased to Tuple[int, str]. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 's' >> (beam.Create(range(5)) + | (beam.Create(range(5)) .with_output_types(typehints.Iterable[int])) - | 'T' >> beam.GroupByKey()) + | beam.GroupByKey()) self.assertEqual("Input type hint violation at T: " "expected Tuple[TypeVariable[K], TypeVariable[V]], " @@ -973,7 +974,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): (self.p | 'nums' >> beam.Create(range(5)).with_output_types(int) | 'mod dup' >> beam.Map(lambda x: (x % 2, x)) - | 'G' >> beam.GroupByKeyOnly()) + | beam.GroupByKeyOnly()) self.assertEqual('Pipeline type checking is enabled, however no output ' 'type-hint was found for the PTransform ' @@ -1092,10 +1093,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # ParDo should receive an instance of type 'str', however an 'int' will be # passed instead. with self.assertRaises(typehints.TypeCheckError) as e: - (self.p | 'n' >> beam.Create([1, 2, 3]) - | 'to int' >> (beam.FlatMap(lambda x: [int(x)]) - .with_input_types(str).with_output_types(int)) - ) + (self.p + | beam.Create([1, 2, 3]) + | ('to int' >> beam.FlatMap(lambda x: [int(x)]) + .with_input_types(str).with_output_types(int))) self.p.run() self.assertStartswith( @@ -1111,8 +1112,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)]) - | 'add' >> (beam.FlatMap(lambda (x, y): [x + y]) + | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)]) + | ('add' >> beam.FlatMap(lambda (x, y): [x + y]) .with_input_types(typehints.Tuple[int, int]).with_output_types(int)) ) self.p.run() @@ -1136,8 +1137,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): lambda x: [float(x)]).with_input_types(int).with_output_types( int).get_type_hints() with self.assertRaises(typehints.TypeCheckError) as e: - (self.p | 'n' >> beam.Create([1, 2, 3]) - | 'to int' >> (beam.FlatMap(lambda x: [float(x)]) + (self.p + | beam.Create([1, 2, 3]) + | ('to int' >> beam.FlatMap(lambda x: [float(x)]) .with_input_types(int).with_output_types(int)) ) self.p.run() @@ -1159,8 +1161,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # of 'int' will be generated instead. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'n' >> beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)]) - | 'swap' >> (beam.FlatMap(lambda (x, y): [x + y]) + | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)]) + | ('swap' >> beam.FlatMap(lambda (x, y): [x + y]) .with_input_types(typehints.Tuple[int, float]) .with_output_types(typehints.Tuple[float, int])) ) @@ -1183,7 +1185,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): return a + b with self.assertRaises(typehints.TypeCheckError) as e: - (self.p | 't' >> beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 1.0)) + (self.p | beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 1.0)) self.p.run() self.assertStartswith( @@ -1199,8 +1201,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 't' >> beam.Create([1, 2, 3, 4]) - | 'add 1' >> (beam.Map(lambda x, one: x + one, 1.0) + | beam.Create([1, 2, 3, 4]) + | ('add 1' >> beam.Map(lambda x, one: x + one, 1.0) .with_input_types(int, int) .with_output_types(float))) self.p.run() @@ -1305,8 +1307,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_combine_pipeline_type_check_using_methods(self): d = (self.p - | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) - | 'concat' >> (beam.CombineGlobally(lambda s: ''.join(s)) + | beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | ('concat' >> beam.CombineGlobally(lambda s: ''.join(s)) .with_input_types(str).with_output_types(str))) def matcher(expected): @@ -1321,8 +1323,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | 's' >> beam.Create(range(5)).with_output_types(int) - | 'sum' >> (beam.CombineGlobally(lambda s: sum(s)) + | beam.Create(range(5)).with_output_types(int) + | ('sum' >> beam.CombineGlobally(lambda s: sum(s)) .with_input_types(int).with_output_types(int))) assert_that(d, equal_to([10])) @@ -1331,8 +1333,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_combine_pipeline_type_check_violation_using_methods(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'e' >> beam.Create(range(3)).with_output_types(int) - | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s))) + | beam.Create(range(3)).with_output_types(int) + | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s))) .with_input_types(str).with_output_types(str))) self.assertEqual("Input type hint violation at sort join: " @@ -1345,8 +1347,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'e' >> beam.Create(range(3)).with_output_types(int) - | 'sort join' >> (beam.CombineGlobally(lambda s: ''.join(sorted(s))) + | beam.Create(range(3)).with_output_types(int) + | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s))) .with_input_types(str).with_output_types(str))) self.p.run() @@ -1427,8 +1429,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_mean_per_key_pipeline_checking_satisfied(self): d = (self.p - | 'c' >> beam.Create(range(5)).with_output_types(int) - | 'even group' >> (beam.Map(lambda x: (not x % 2, x)) + | beam.Create(range(5)).with_output_types(int) + | ('even group' >> beam.Map(lambda x: (not x % 2, x)) .with_output_types(typehints.KV[bool, int])) | 'even mean' >> combine.Mean.PerKey()) @@ -1439,8 +1441,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_mean_per_key_pipeline_checking_violated(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'e' >> beam.Create(map(str, range(5))).with_output_types(str) - | 'upper pair' >> (beam.Map(lambda x: (x.upper(), x)) + | beam.Create(map(str, range(5))).with_output_types(str) + | ('upper pair' >> beam.Map(lambda x: (x.upper(), x)) .with_output_types(typehints.KV[str, str])) | 'even mean' >> combine.Mean.PerKey()) self.p.run() @@ -1455,8 +1457,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | 'c' >> beam.Create(range(5)).with_output_types(int) - | 'odd group' >> (beam.Map(lambda x: (bool(x % 2), x)) + | beam.Create(range(5)).with_output_types(int) + | ('odd group' >> beam.Map(lambda x: (bool(x % 2), x)) .with_output_types(typehints.KV[bool, int])) | 'odd mean' >> combine.Mean.PerKey()) @@ -1470,8 +1472,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'c' >> beam.Create(range(5)).with_output_types(int) - | 'odd group' >> (beam.Map(lambda x: (x, str(bool(x % 2)))) + | beam.Create(range(5)).with_output_types(int) + | ('odd group' >> beam.Map(lambda x: (x, str(bool(x % 2)))) .with_output_types(typehints.KV[int, str])) | 'odd mean' >> combine.Mean.PerKey()) self.p.run() @@ -1514,8 +1516,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_count_perkey_pipeline_type_checking_satisfied(self): d = (self.p - | 'p' >> beam.Create(range(5)).with_output_types(int) - | 'even group' >> (beam.Map(lambda x: (not x % 2, x)) + | beam.Create(range(5)).with_output_types(int) + | ('even group' >> beam.Map(lambda x: (not x % 2, x)) .with_output_types(typehints.KV[bool, int])) | 'count int' >> combine.Count.PerKey()) @@ -1526,7 +1528,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_count_perkey_pipeline_type_checking_violated(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'p' >> beam.Create(range(5)).with_output_types(int) + | beam.Create(range(5)).with_output_types(int) | 'count int' >> combine.Count.PerKey()) self.assertEqual("Input type hint violation at GroupByKey: " @@ -1538,7 +1540,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | 'c' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | beam.Create(['t', 'e', 's', 't']).with_output_types(str) | 'dup key' >> beam.Map(lambda x: (x, x)) .with_output_types(typehints.KV[str, str]) | 'count dups' >> combine.Count.PerKey()) @@ -1549,7 +1551,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_count_perelement_pipeline_type_checking_satisfied(self): d = (self.p - | 'w' >> beam.Create([1, 1, 2, 3]).with_output_types(int) + | beam.Create([1, 1, 2, 3]).with_output_types(int) | 'count elems' >> combine.Count.PerElement()) self.assertCompatible(typehints.KV[int, int], d.element_type) @@ -1561,7 +1563,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'f' >> beam.Create([1, 1, 2, 3]) + | beam.Create([1, 1, 2, 3]) | 'count elems' >> combine.Count.PerElement()) self.assertEqual('Pipeline type checking is enabled, however no output ' @@ -1573,7 +1575,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | 'w' >> beam.Create([True, True, False, True, True]) + | beam.Create([True, True, False, True, True]) .with_output_types(bool) | 'count elems' >> combine.Count.PerElement()) @@ -1583,7 +1585,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_top_of_pipeline_checking_satisfied(self): d = (self.p - | 'n' >> beam.Create(range(5, 11)).with_output_types(int) + | beam.Create(range(5, 11)).with_output_types(int) | 'top 3' >> combine.Top.Of(3, lambda x, y: x < y)) self.assertCompatible(typehints.Iterable[int], @@ -1595,7 +1597,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | 'n' >> beam.Create(list('testing')).with_output_types(str) + | beam.Create(list('testing')).with_output_types(str) | 'acii top' >> combine.Top.Of(3, lambda x, y: x < y)) self.assertCompatible(typehints.Iterable[str], d.element_type) @@ -1605,7 +1607,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_per_key_pipeline_checking_violated(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'n' >> beam.Create(range(100)).with_output_types(int) + | beam.Create(range(100)).with_output_types(int) | 'num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int) | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b)) @@ -1616,8 +1618,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_per_key_pipeline_checking_satisfied(self): d = (self.p - | 'n' >> beam.Create(range(100)).with_output_types(int) - | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x)) + | beam.Create(range(100)).with_output_types(int) + | ('group mod 3' >> beam.Map(lambda x: (x % 3, x)) .with_output_types(typehints.KV[int, int])) | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b)) @@ -1630,8 +1632,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | 'n' >> beam.Create(range(21)) - | 'group mod 3' >> (beam.Map(lambda x: (x % 3, x)) + | beam.Create(range(21)) + | ('group mod 3' >> beam.Map(lambda x: (x % 3, x)) .with_output_types(typehints.KV[int, int])) | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b)) @@ -1642,7 +1644,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_sample_globally_pipeline_satisfied(self): d = (self.p - | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int) + | beam.Create([2, 2, 3, 3]).with_output_types(int) | 'sample' >> combine.Sample.FixedSizeGlobally(3)) self.assertCompatible(typehints.Iterable[int], d.element_type) @@ -1658,7 +1660,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | 'm' >> beam.Create([2, 2, 3, 3]).with_output_types(int) + | beam.Create([2, 2, 3, 3]).with_output_types(int) | 'sample' >> combine.Sample.FixedSizeGlobally(2)) self.assertCompatible(typehints.Iterable[int], d.element_type) @@ -1672,7 +1674,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_sample_per_key_pipeline_satisfied(self): d = (self.p - | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)]) + | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)]) .with_output_types(typehints.KV[int, int])) | 'sample' >> combine.Sample.FixedSizePerKey(2)) @@ -1691,7 +1693,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | 'm' >> (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)]) + | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)]) .with_output_types(typehints.KV[int, int])) | 'sample' >> combine.Sample.FixedSizePerKey(1)) @@ -1708,8 +1710,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_to_list_pipeline_check_satisfied(self): d = (self.p - | 'c' >> beam.Create((1, 2, 3, 4)).with_output_types(int) - | 'to list' >> combine.ToList()) + | beam.Create((1, 2, 3, 4)).with_output_types(int) + | combine.ToList()) self.assertCompatible(typehints.List[int], d.element_type) @@ -1724,8 +1726,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | 'c' >> beam.Create(list('test')).with_output_types(str) - | 'to list' >> combine.ToList()) + | beam.Create(list('test')).with_output_types(str) + | combine.ToList()) self.assertCompatible(typehints.List[str], d.element_type) @@ -1739,8 +1741,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_to_dict_pipeline_check_violated(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'd' >> beam.Create([1, 2, 3, 4]).with_output_types(int) - | 'to dict' >> combine.ToDict()) + | beam.Create([1, 2, 3, 4]).with_output_types(int) + | combine.ToDict()) self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': " "requires Tuple[TypeVariable[K], " @@ -1751,9 +1753,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_to_dict_pipeline_check_satisfied(self): d = (self.p | beam.Create( - 'd', [(1, 2), (3, 4)]).with_output_types(typehints.Tuple[int, int]) - | 'to dict' >> combine.ToDict()) + | combine.ToDict()) self.assertCompatible(typehints.Dict[int, int], d.element_type) assert_that(d, equal_to([{1: 2, 3: 4}])) @@ -1763,9 +1764,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | 'd' >> (beam.Create([('1', 2), ('3', 4)]) + | (beam.Create([('1', 2), ('3', 4)]) .with_output_types(typehints.Tuple[str, int])) - | 'to dict' >> combine.ToDict()) + | combine.ToDict()) self.assertCompatible(typehints.Dict[str, int], d.element_type) assert_that(d, equal_to([{'1': 2, '3': 4}])) @@ -1776,7 +1777,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(TypeError) as e: (self.p - | 't' >> beam.Create([1, 2, 3]).with_output_types(int) + | beam.Create([1, 2, 3]).with_output_types(int) | 'len' >> beam.Map(lambda x: len(x)).with_output_types(int)) self.p.run() @@ -1799,7 +1800,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): beam.core.GroupByKeyOnly().infer_output_type(typehints.KV[str, int])) def test_pipeline_inference(self): - created = self.p | 'c' >> beam.Create(['a', 'b', 'c']) + created = self.p | beam.Create(['a', 'b', 'c']) mapped = created | 'pair with 1' >> beam.Map(lambda x: (x, 1)) grouped = mapped | beam.GroupByKey() self.assertEqual(str, created.element_type) @@ -1810,7 +1811,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_inferred_bad_kv_type(self): with self.assertRaises(typehints.TypeCheckError) as e: _ = (self.p - | 't' >> beam.Create(['a', 'b', 'c']) + | beam.Create(['a', 'b', 'c']) | 'ungroupable' >> beam.Map(lambda x: (x, 0, 1.0)) | beam.GroupByKey()) @@ -1821,11 +1822,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_type_inference_command_line_flag_toggle(self): self.p.options.view_as(TypeOptions).pipeline_type_check = False - x = self.p | 't' >> beam.Create([1, 2, 3, 4]) + x = self.p | 'c1' >> beam.Create([1, 2, 3, 4]) self.assertIsNone(x.element_type) self.p.options.view_as(TypeOptions).pipeline_type_check = True - x = self.p | 'm' >> beam.Create([1, 2, 3, 4]) + x = self.p | 'c2' >> beam.Create([1, 2, 3, 4]) self.assertEqual(int, x.element_type) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/transforms/util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index bbb7787..4564cf9 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -220,7 +220,8 @@ def assert_that(actual, matcher, label='assert_that'): class AssertThat(PTransform): def apply(self, pipeline): - return pipeline | 'singleton' >> Create([None]) | Map(match, AllOf(actual)) + return pipeline | 'singleton' >> Create([None]) | Map(match, + AllOf(actual)) def default_label(self): return label http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/typehints/typed_pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 4e1ab68..f2e8f12 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -177,7 +177,7 @@ class SideInputTest(unittest.TestCase): bad_side_input = p | 'bad_side' >> beam.Create(['z']) with self.assertRaises(typehints.TypeCheckError): - main_input | 'again' >> beam.Map(repeat, pvalue.AsSingleton(bad_side_input)) + main_input | 'bis' >> beam.Map(repeat, pvalue.AsSingleton(bad_side_input)) def test_deferred_side_input_iterable(self): @typehints.with_input_types(str, typehints.Iterable[str])