Remove unneeded labels, and convert existing labels to UpperCamelCase.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d863e686 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d863e686 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d863e686 Branch: refs/heads/python-sdk Commit: d863e6863b1487aa884151c9a317076dab8facd6 Parents: 0db60e4 Author: Ahmet Altay <[email protected]> Authored: Thu Jan 19 12:28:48 2017 -0800 Committer: Ahmet Altay <[email protected]> Committed: Thu Jan 19 12:28:48 2017 -0800 ---------------------------------------------------------------------- .../apache_beam/examples/complete/tfidf.py | 38 +- .../cookbook/bigquery_side_input_test.py | 13 +- .../apache_beam/examples/cookbook/filters.py | 12 +- .../examples/cookbook/mergecontacts.py | 16 +- .../apache_beam/examples/streaming_wordcount.py | 10 +- sdks/python/apache_beam/io/fileio_test.py | 12 +- sdks/python/apache_beam/io/iobase.py | 12 +- sdks/python/apache_beam/io/textio_test.py | 6 +- sdks/python/apache_beam/runners/runner_test.py | 25 +- .../apache_beam/transforms/ptransform_test.py | 513 +++++++++---------- 10 files changed, 325 insertions(+), 332 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/examples/complete/tfidf.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py index c048cdd..367e275 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf.py +++ b/sdks/python/apache_beam/examples/complete/tfidf.py @@ -42,9 +42,9 @@ def read_documents(pipeline, uris): for uri in uris: pcolls.append( pipeline - | 'read: %s' % uri >> ReadFromText(uri) - | 'withkey: %s' % uri >> beam.Map(lambda v, uri: (uri, v), uri)) - return pcolls | 'flatten read pcolls' >> beam.Flatten() + | 'Read: %s' % uri >> ReadFromText(uri) + | 'WithKey: %s' % uri >> beam.Map(lambda v, uri: (uri, v), uri)) + return pcolls | 'FlattenReadPColls' >> beam.Flatten() class TfIdf(beam.PTransform): @@ -61,9 +61,9 @@ class TfIdf(beam.PTransform): # PCollection to use as side input. total_documents = ( uri_to_content - | 'get uris' >> beam.Keys() - | 'get unique uris' >> beam.RemoveDuplicates() - | ' count uris' >> beam.combiners.Count.Globally()) + | 'GetUris 1' >> beam.Keys() + | 'GetUniqueUris' >> beam.RemoveDuplicates() + | 'CountUris' >> beam.combiners.Count.Globally()) # Create a collection of pairs mapping a URI to each of the words # in the document associated with that that URI. @@ -73,35 +73,35 @@ class TfIdf(beam.PTransform): uri_to_words = ( uri_to_content - | 'split words' >> beam.FlatMap(split_into_words)) + | 'SplitWords' >> beam.FlatMap(split_into_words)) # Compute a mapping from each word to the total number of documents # in which it appears. word_to_doc_count = ( uri_to_words - | 'get unique words per doc' >> beam.RemoveDuplicates() - | 'get words' >> beam.Values() - | 'count docs per word' >> beam.combiners.Count.PerElement()) + | 'GetUniqueWordsPerDoc' >> beam.RemoveDuplicates() + | 'GetWords' >> beam.Values() + | 'CountDocsPerWord' >> beam.combiners.Count.PerElement()) # Compute a mapping from each URI to the total number of words in the # document associated with that URI. uri_to_word_total = ( uri_to_words - | ' get uris' >> beam.Keys() - | 'count words in doc' >> beam.combiners.Count.PerElement()) + | 'GetUris 2' >> beam.Keys() + | 'CountWordsInDoc' >> beam.combiners.Count.PerElement()) # Count, for each (URI, word) pair, the number of occurrences of that word # in the document associated with the URI. uri_and_word_to_count = ( uri_to_words - | 'count word-doc pairs' >> beam.combiners.Count.PerElement()) + | 'CountWord-DocPairs' >> beam.combiners.Count.PerElement()) # Adjust the above collection to a mapping from (URI, word) pairs to counts # into an isomorphic mapping from URI to (word, count) pairs, to prepare # for a join by the URI key. uri_to_word_and_count = ( uri_and_word_to_count - | 'shift keys' >> beam.Map( + | 'ShiftKeys' >> beam.Map( lambda ((uri, word), count): (uri, (word, count)))) # Perform a CoGroupByKey (a sort of pre-join) on the prepared @@ -118,7 +118,7 @@ class TfIdf(beam.PTransform): # ... ]} uri_to_word_and_count_and_total = ( {'word totals': uri_to_word_total, 'word counts': uri_to_word_and_count} - | 'cogroup by uri' >> beam.CoGroupByKey()) + | 'CoGroupByUri' >> beam.CoGroupByKey()) # Compute a mapping from each word to a (URI, term frequency) pair for each # URI. A word's term frequency for a document is simply the number of times @@ -134,7 +134,7 @@ class TfIdf(beam.PTransform): word_to_uri_and_tf = ( uri_to_word_and_count_and_total - | 'compute term frequencies' >> beam.FlatMap(compute_term_frequency)) + | 'ComputeTermFrequencies' >> beam.FlatMap(compute_term_frequency)) # Compute a mapping from each word to its document frequency. # A word's document frequency in a corpus is the number of @@ -149,7 +149,7 @@ class TfIdf(beam.PTransform): # DoFns in this way. word_to_df = ( word_to_doc_count - | 'compute doc frequencies' >> beam.Map( + | 'ComputeDocFrequencies' >> beam.Map( lambda (word, count), total: (word, float(count) / total), AsSingleton(total_documents))) @@ -157,7 +157,7 @@ class TfIdf(beam.PTransform): # each keyed on the word. word_to_uri_and_tf_and_df = ( {'tf': word_to_uri_and_tf, 'df': word_to_df} - | 'cogroup words by tf-df' >> beam.CoGroupByKey()) + | 'CoGroupWordsByTf-df' >> beam.CoGroupByKey()) # Compute a mapping from each word to a (URI, TF-IDF) score for each URI. # There are a variety of definitions of TF-IDF @@ -172,7 +172,7 @@ class TfIdf(beam.PTransform): word_to_uri_and_tfidf = ( word_to_uri_and_tf_and_df - | 'compute tf-idf' >> beam.FlatMap(compute_tf_idf)) + | 'ComputeTf-idf' >> beam.FlatMap(compute_tf_idf)) return word_to_uri_and_tfidf http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py index 66cab77..5869976 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py @@ -30,14 +30,13 @@ class BigQuerySideInputTest(unittest.TestCase): def test_create_groups(self): p = TestPipeline() - group_ids_pcoll = p | 'create_group_ids' >> beam.Create(['A', 'B', 'C']) - corpus_pcoll = p | 'create_corpus' >> beam.Create( + group_ids_pcoll = p | 'CreateGroupIds' >> beam.Create(['A', 'B', 'C']) + corpus_pcoll = p | 'CreateCorpus' >> beam.Create( [{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}]) - words_pcoll = p | 'create_words' >> beam.Create([{'f': 'word1'}, - {'f': 'word2'}, - {'f': 'word3'}]) - ignore_corpus_pcoll = p | 'create_ignore_corpus' >> beam.Create(['corpus1']) - ignore_word_pcoll = p | 'create_ignore_word' >> beam.Create(['word1']) + words_pcoll = p | 'CreateWords' >> beam.Create( + [{'f': 'word1'}, {'f': 'word2'}, {'f': 'word3'}]) + ignore_corpus_pcoll = p | 'CreateIgnoreCorpus' >> beam.Create(['corpus1']) + ignore_word_pcoll = p | 'CreateIgnoreWord' >> beam.Create(['word1']) groups = bigquery_side_input.create_groups(group_ids_pcoll, corpus_pcoll, words_pcoll, ignore_corpus_pcoll, http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/examples/cookbook/filters.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py index 7c77b9d..d13d823 100644 --- a/sdks/python/apache_beam/examples/cookbook/filters.py +++ b/sdks/python/apache_beam/examples/cookbook/filters.py @@ -53,21 +53,21 @@ def filter_cold_days(input_data, month_filter): projection_fields = ['year', 'month', 'day', 'mean_temp'] fields_of_interest = ( input_data - | 'projected' >> beam.Map( + | 'Projected' >> beam.Map( lambda row: {f: row[f] for f in projection_fields})) # Compute the global mean temperature. global_mean = AsSingleton( fields_of_interest - | 'extract mean' >> beam.Map(lambda row: row['mean_temp']) - | 'global mean' >> beam.combiners.Mean.Globally()) + | 'ExtractMean' >> beam.Map(lambda row: row['mean_temp']) + | 'GlobalMean' >> beam.combiners.Mean.Globally()) # Filter to the rows representing days in the month of interest # in which the mean daily temperature is below the global mean. return ( fields_of_interest - | 'desired month' >> beam.Filter(lambda row: row['month'] == month_filter) - | 'below mean' >> beam.Filter( + | 'DesiredMonth' >> beam.Filter(lambda row: row['month'] == month_filter) + | 'BelowMean' >> beam.Filter( lambda row, mean: row['mean_temp'] < mean, global_mean)) @@ -92,7 +92,7 @@ def run(argv=None): # pylint: disable=expression-not-assigned (filter_cold_days(input_data, known_args.month_filter) - | 'save to BQ' >> beam.io.Write(beam.io.BigQuerySink( + | 'SaveToBQ' >> beam.io.Write(beam.io.BigQuerySink( known_args.output, schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT', create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/examples/cookbook/mergecontacts.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py index 55bdc50..c880a9a 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py @@ -74,12 +74,12 @@ def run(argv=None, assert_results=None): # quotes/backslashes, and convert it a PCollection of (key, value) pairs. def read_kv_textfile(label, textfile): return (p - | 'read_%s' % label >> ReadFromText(textfile) - | 'backslash_%s' % label >> beam.Map( + | 'Read: %s' % label >> ReadFromText(textfile) + | 'Backslash: %s' % label >> beam.Map( lambda x: re.sub(r'\\', r'\\\\', x)) - | 'escape_quotes_%s' % label >> beam.Map( + | 'EscapeQuotes: %s' % label >> beam.Map( lambda x: re.sub(r'"', r'\"', x)) - | 'split_%s' % label >> beam.Map( + | 'Split: %s' % label >> beam.Map( lambda x: re.split(r'\t+', x, 1))) # Read input databases. @@ -107,13 +107,13 @@ def run(argv=None, assert_results=None): nomads = grouped | beam.Filter( # People without addresses. lambda (name, (email, phone, snailmail)): not next(iter(snailmail), None)) - num_luddites = luddites | 'luddites' >> beam.combiners.Count.Globally() - num_writers = writers | 'writers' >> beam.combiners.Count.Globally() - num_nomads = nomads | 'nomads' >> beam.combiners.Count.Globally() + num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally() + num_writers = writers | 'Writers' >> beam.combiners.Count.Globally() + num_nomads = nomads | 'Nomads' >> beam.combiners.Count.Globally() # Write tab-delimited output. # pylint: disable=expression-not-assigned - tsv_lines | 'write_tsv' >> WriteToText(known_args.output_tsv) + tsv_lines | 'WriteTsv' >> WriteToText(known_args.output_tsv) # TODO(silviuc): Move the assert_results logic to the unit test. if assert_results is not None: http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/examples/streaming_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index e34a64e..7fb2c81 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -52,14 +52,14 @@ def run(argv=None): # Capitalize the characters in each line. transformed = (lines - | 'split' >> ( + | 'Split' >> ( beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) .with_output_types(unicode)) - | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) + | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | beam.WindowInto(window.FixedWindows(15, 0)) - | 'group' >> beam.GroupByKey() - | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones))) - | 'format' >> beam.Map(lambda tup: '%s: %d' % tup)) + | 'Group' >> beam.GroupByKey() + | 'Count' >> beam.Map(lambda (word, ones): (word, sum(ones))) + | 'Format' >> beam.Map(lambda tup: '%s: %d' % tup)) # Write to PubSub. # pylint: disable=expression-not-assigned http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 772e5e2..f75bc5d 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -762,8 +762,8 @@ class TestNativeTextFileSink(unittest.TestCase): def test_write_native(self): pipeline = TestPipeline() - pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) - pcoll | 'Write' >> beam.Write(fileio.NativeTextFileSink(self.path)) # pylint: disable=expression-not-assigned + pcoll = pipeline | beam.core.Create(self.lines) + pcoll | beam.Write(fileio.NativeTextFileSink(self.path)) # pylint: disable=expression-not-assigned pipeline.run() read_result = [] @@ -775,8 +775,8 @@ class TestNativeTextFileSink(unittest.TestCase): def test_write_native_auto_compression(self): pipeline = TestPipeline() - pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) - pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned + pcoll = pipeline | beam.core.Create(self.lines) + pcoll | beam.Write( # pylint: disable=expression-not-assigned fileio.NativeTextFileSink( self.path, file_name_suffix='.gz')) pipeline.run() @@ -790,8 +790,8 @@ class TestNativeTextFileSink(unittest.TestCase): def test_write_native_auto_compression_unsharded(self): pipeline = TestPipeline() - pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) - pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned + pcoll = pipeline | beam.core.Create(self.lines) + pcoll | beam.Write( # pylint: disable=expression-not-assigned fileio.NativeTextFileSink( self.path + '.gz', shard_name_template='')) pipeline.run() http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 93421a6..12af3b6 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -727,7 +727,7 @@ class Write(ptransform.PTransform): from apache_beam.runners.dataflow.native_io import iobase as dataflow_io if isinstance(self.sink, dataflow_io.NativeSink): # A native sink - return pcoll | 'native_write' >> dataflow_io._NativeWrite(self.sink) + return pcoll | 'NativeWrite' >> dataflow_io._NativeWrite(self.sink) elif isinstance(self.sink, Sink): # A custom sink return pcoll | WriteImpl(self.sink) @@ -748,7 +748,7 @@ class WriteImpl(ptransform.PTransform): def expand(self, pcoll): do_once = pcoll.pipeline | 'DoOnce' >> core.Create([None]) - init_result_coll = do_once | 'initialize_write' >> core.Map( + init_result_coll = do_once | 'InitializeWrite' >> core.Map( lambda _, sink: sink.initialize_write(), self.sink) if getattr(self.sink, 'num_shards', 0): min_shards = self.sink.num_shards @@ -759,20 +759,20 @@ class WriteImpl(ptransform.PTransform): write_result_coll = (keyed_pcoll | core.WindowInto(window.GlobalWindows()) | core.GroupByKey() - | 'write_bundles' >> core.Map( + | 'WriteBundles' >> core.Map( _write_keyed_bundle, self.sink, AsSingleton(init_result_coll))) else: min_shards = 1 write_result_coll = (pcoll - | 'write_bundles' >> + | 'WriteBundles' >> core.ParDo( _WriteBundleDoFn(), self.sink, AsSingleton(init_result_coll)) - | 'pair' >> core.Map(lambda x: (None, x)) + | 'Pair' >> core.Map(lambda x: (None, x)) | core.WindowInto(window.GlobalWindows()) | core.GroupByKey() - | 'extract' >> core.FlatMap(lambda x: x[1])) + | 'Extract' >> core.FlatMap(lambda x: x[1])) return do_once | core.FlatMap( 'finalize_write', _finalize_write, http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/io/textio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index ccb64ff..4b85584 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -524,7 +524,7 @@ class TextSinkTest(unittest.TestCase): def test_write_dataflow(self): pipeline = TestPipeline() - pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) + pcoll = pipeline | beam.core.Create(self.lines) pcoll | 'Write' >> WriteToText(self.path) # pylint: disable=expression-not-assigned pipeline.run() @@ -537,7 +537,7 @@ class TextSinkTest(unittest.TestCase): def test_write_dataflow_auto_compression(self): pipeline = TestPipeline() - pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) + pcoll = pipeline | beam.core.Create(self.lines) pcoll | 'Write' >> WriteToText(self.path, file_name_suffix='.gz') # pylint: disable=expression-not-assigned pipeline.run() @@ -550,7 +550,7 @@ class TextSinkTest(unittest.TestCase): def test_write_dataflow_auto_compression_unsharded(self): pipeline = TestPipeline() - pcoll = pipeline | 'Create' >> beam.core.Create(self.lines) + pcoll = pipeline | beam.core.Create(self.lines) pcoll | 'Write' >> WriteToText(self.path + '.gz', shard_name_template='') # pylint: disable=expression-not-assigned pipeline.run() http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/runners/runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index f95b295..f522590 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -84,9 +84,9 @@ class RunnerTest(unittest.TestCase): p = Pipeline(remote_runner, options=PipelineOptions(self.default_properties)) - (p | 'create' >> ptransform.Create([1, 2, 3]) # pylint: disable=expression-not-assigned - | 'do' >> ptransform.FlatMap(lambda x: [(x, x)]) - | 'gbk' >> ptransform.GroupByKey()) + (p | ptransform.Create([1, 2, 3]) # pylint: disable=expression-not-assigned + | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)]) + | ptransform.GroupByKey()) remote_runner.job = apiclient.Job(p.options) super(DataflowRunner, remote_runner).run(p) @@ -118,8 +118,8 @@ class RunnerTest(unittest.TestCase): now = datetime.now() # pylint: disable=expression-not-assigned - (p | 'create' >> ptransform.Create([1, 2, 3, 4, 5]) - | 'do' >> SpecialParDo(SpecialDoFn(), now)) + (p | ptransform.Create([1, 2, 3, 4, 5]) + | 'Do' >> SpecialParDo(SpecialDoFn(), now)) remote_runner.job = apiclient.Job(p.options) super(DataflowRunner, remote_runner).run(p) @@ -166,8 +166,8 @@ class RunnerTest(unittest.TestCase): p = Pipeline(runner, options=PipelineOptions(self.default_properties)) # pylint: disable=expression-not-assigned - (p | 'create' >> ptransform.Create([1, 2, 3, 4, 5]) - | 'do' >> beam.ParDo(MyDoFn())) + (p | ptransform.Create([1, 2, 3, 4, 5]) + | 'Do' >> beam.ParDo(MyDoFn())) result = p.run() result.wait_until_finish() metrics = result.metrics().query() @@ -178,19 +178,19 @@ class RunnerTest(unittest.TestCase): metrics['counters'], hc.contains_inanyorder( MetricResult( - MetricKey('do', MetricName(namespace, 'elements')), + MetricKey('Do', MetricName(namespace, 'elements')), 5, 5), MetricResult( - MetricKey('do', MetricName(namespace, 'bundles')), + MetricKey('Do', MetricName(namespace, 'bundles')), 1, 1), MetricResult( - MetricKey('do', MetricName(namespace, 'finished_bundles')), + MetricKey('Do', MetricName(namespace, 'finished_bundles')), 1, 1))) hc.assert_that( metrics['distributions'], hc.contains_inanyorder( MetricResult( - MetricKey('do', MetricName(namespace, 'element_dist')), + MetricKey('Do', MetricName(namespace, 'element_dist')), DistributionResult(DistributionData(15, 5, 1, 5)), DistributionResult(DistributionData(15, 5, 1, 5))))) @@ -205,8 +205,7 @@ class RunnerTest(unittest.TestCase): '--temp_location=/dev/null', '--no_auth=True' ])) - rows = p | 'read' >> beam.io.Read( - beam.io.BigQuerySource('dataset.faketable')) + rows = p | beam.io.Read(beam.io.BigQuerySource('dataset.faketable')) with self.assertRaises(ValueError, msg=('Coder for the GroupByKey operation' '"GroupByKey" is not a key-value coder: ' http://git-wip-us.apache.org/repos/asf/beam/blob/d863e686/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 13b963c..827bc83 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -54,8 +54,8 @@ class PTransformTest(unittest.TestCase): str(PTransform())) pa = TestPipeline() - res = pa | 'a_label' >> beam.Create([1, 2]) - self.assertEqual('AppliedPTransform(a_label, Create)', + res = pa | 'ALabel' >> beam.Create([1, 2]) + self.assertEqual('AppliedPTransform(ALabel, Create)', str(res.producer)) pc = TestPipeline() @@ -111,8 +111,8 @@ class PTransformTest(unittest.TestCase): return [context.element + addon] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) - result = pcoll | 'do' >> beam.ParDo(AddNDoFn(), 10) + pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3]) + result = pcoll | 'Do' >> beam.ParDo(AddNDoFn(), 10) assert_that(result, equal_to([11, 12, 13])) pipeline.run() @@ -123,39 +123,39 @@ class PTransformTest(unittest.TestCase): pass pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) + pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3]) with self.assertRaises(ValueError): - pcoll | 'do' >> beam.ParDo(MyDoFn) # Note the lack of ()'s + pcoll | 'Do' >> beam.ParDo(MyDoFn) # Note the lack of ()'s def test_do_with_callable(self): pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) - result = pcoll | 'do' >> beam.FlatMap(lambda x, addon: [x + addon], 10) + pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3]) + result = pcoll | 'Do' >> beam.FlatMap(lambda x, addon: [x + addon], 10) assert_that(result, equal_to([11, 12, 13])) pipeline.run() def test_do_with_side_input_as_arg(self): pipeline = TestPipeline() - side = pipeline | 'side' >> beam.Create([10]) - pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) - result = pcoll | beam.FlatMap( - 'do', lambda x, addon: [x + addon], pvalue.AsSingleton(side)) + side = pipeline | 'Side' >> beam.Create([10]) + pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3]) + result = pcoll | 'Do' >> beam.FlatMap( + lambda x, addon: [x + addon], pvalue.AsSingleton(side)) assert_that(result, equal_to([11, 12, 13])) pipeline.run() def test_do_with_side_input_as_keyword_arg(self): pipeline = TestPipeline() - side = pipeline | 'side' >> beam.Create([10]) - pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) - result = pcoll | beam.FlatMap( - 'do', lambda x, addon: [x + addon], addon=pvalue.AsSingleton(side)) + side = pipeline | 'Side' >> beam.Create([10]) + pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3]) + result = pcoll | 'Do' >> beam.FlatMap( + lambda x, addon: [x + addon], addon=pvalue.AsSingleton(side)) assert_that(result, equal_to([11, 12, 13])) pipeline.run() def test_do_with_do_fn_returning_string_raises_warning(self): pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3']) - pcoll | 'do' >> beam.FlatMap(lambda x: x + '1') + pcoll = pipeline | 'Start' >> beam.Create(['2', '9', '3']) + pcoll | 'Do' >> beam.FlatMap(lambda x: x + '1') # Since the DoFn directly returns a string we should get an error warning # us. @@ -168,8 +168,8 @@ class PTransformTest(unittest.TestCase): def test_do_with_do_fn_returning_dict_raises_warning(self): pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(['2', '9', '3']) - pcoll | 'do' >> beam.FlatMap(lambda x: {x: '1'}) + pcoll = pipeline | 'Start' >> beam.Create(['2', '9', '3']) + pcoll | 'Do' >> beam.FlatMap(lambda x: {x: '1'}) # Since the DoFn directly returns a dict we should get an error warning # us. @@ -182,9 +182,9 @@ class PTransformTest(unittest.TestCase): def test_do_with_side_outputs_maintains_unique_name(self): pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) - r1 = pcoll | 'a' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m') - r2 = pcoll | 'b' >> beam.FlatMap(lambda x: [x + 2]).with_outputs(main='m') + pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3]) + r1 = pcoll | 'A' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m') + r2 = pcoll | 'B' >> beam.FlatMap(lambda x: [x + 2]).with_outputs(main='m') assert_that(r1.m, equal_to([2, 3, 4]), label='r1') assert_that(r2.m, equal_to([3, 4, 5]), label='r2') pipeline.run() @@ -195,8 +195,8 @@ class PTransformTest(unittest.TestCase): def incorrect_par_do_fn(x): return x + 5 pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([2, 9, 3]) - pcoll | 'do' >> beam.FlatMap(incorrect_par_do_fn) + pcoll = pipeline | 'Start' >> beam.Create([2, 9, 3]) + pcoll | 'Do' >> beam.FlatMap(incorrect_par_do_fn) # It's a requirement that all user-defined functions to a ParDo return # an iterable. with self.assertRaises(typehints.TypeCheckError) as cm: @@ -216,8 +216,8 @@ class PTransformTest(unittest.TestCase): def finish_bundle(self, c): yield 'finish' pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([1, 2, 3]) - result = pcoll | 'do' >> beam.ParDo(MyDoFn()) + pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3]) + result = pcoll | 'Do' >> beam.ParDo(MyDoFn()) # May have many bundles, but each has a start and finish. def matcher(): @@ -231,9 +231,8 @@ class PTransformTest(unittest.TestCase): def test_filter(self): pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([1, 2, 3, 4]) - result = pcoll | beam.Filter( - 'filter', lambda x: x % 2 == 0) + pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3, 4]) + result = pcoll | 'Filter' >> beam.Filter(lambda x: x % 2 == 0) assert_that(result, equal_to([2, 4])) pipeline.run() @@ -257,15 +256,15 @@ class PTransformTest(unittest.TestCase): def test_combine_with_combine_fn(self): vals = [1, 2, 3, 4, 5, 6, 7] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(vals) - result = pcoll | 'mean' >> beam.CombineGlobally(self._MeanCombineFn()) + pcoll = pipeline | 'Start' >> beam.Create(vals) + result = pcoll | 'Mean' >> beam.CombineGlobally(self._MeanCombineFn()) assert_that(result, equal_to([sum(vals) / len(vals)])) pipeline.run() def test_combine_with_callable(self): vals = [1, 2, 3, 4, 5, 6, 7] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(vals) + pcoll = pipeline | 'Start' >> beam.Create(vals) result = pcoll | beam.CombineGlobally(sum) assert_that(result, equal_to([sum(vals)])) pipeline.run() @@ -273,10 +272,9 @@ class PTransformTest(unittest.TestCase): def test_combine_with_side_input_as_arg(self): values = [1, 2, 3, 4, 5, 6, 7] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(values) - divisor = pipeline | 'divisor' >> beam.Create([2]) - result = pcoll | beam.CombineGlobally( - 'max', + pcoll = pipeline | 'Start' >> beam.Create(values) + divisor = pipeline | 'Divisor' >> beam.Create([2]) + result = pcoll | 'Max' >> beam.CombineGlobally( # Multiples of divisor only. lambda vals, d: max(v for v in vals if v % d == 0), pvalue.AsSingleton(divisor)).without_defaults() @@ -288,9 +286,9 @@ class PTransformTest(unittest.TestCase): vals_1 = [1, 2, 3, 4, 5, 6, 7] vals_2 = [2, 4, 6, 8, 10, 12, 14] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] + + pcoll = pipeline | 'Start' >> beam.Create(([('a', x) for x in vals_1] + [('b', x) for x in vals_2])) - result = pcoll | 'mean' >> beam.CombinePerKey(self._MeanCombineFn()) + result = pcoll | 'Mean' >> beam.CombinePerKey(self._MeanCombineFn()) assert_that(result, equal_to([('a', sum(vals_1) / len(vals_1)), ('b', sum(vals_2) / len(vals_2))])) pipeline.run() @@ -299,7 +297,7 @@ class PTransformTest(unittest.TestCase): vals_1 = [1, 2, 3, 4, 5, 6, 7] vals_2 = [2, 4, 6, 8, 10, 12, 14] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] + + pcoll = pipeline | 'Start' >> beam.Create(([('a', x) for x in vals_1] + [('b', x) for x in vals_2])) result = pcoll | beam.CombinePerKey(sum) assert_that(result, equal_to([('a', sum(vals_1)), ('b', sum(vals_2))])) @@ -309,9 +307,9 @@ class PTransformTest(unittest.TestCase): vals_1 = [1, 2, 3, 4, 5, 6, 7] vals_2 = [2, 4, 6, 8, 10, 12, 14] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(([('a', x) for x in vals_1] + + pcoll = pipeline | 'Start' >> beam.Create(([('a', x) for x in vals_1] + [('b', x) for x in vals_2])) - divisor = pipeline | 'divisor' >> beam.Create([2]) + divisor = pipeline | 'Divisor' >> beam.Create([2]) result = pcoll | beam.CombinePerKey( lambda vals, d: max(v for v in vals if v % d == 0), pvalue.AsSingleton(divisor)) # Multiples of divisor only. @@ -324,7 +322,7 @@ class PTransformTest(unittest.TestCase): pipeline = TestPipeline() pcoll = pipeline | beam.Create( 'start', [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)]) - result = pcoll | 'group' >> beam.GroupByKey() + result = pcoll | 'Group' >> beam.GroupByKey() assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])])) pipeline.run() @@ -336,9 +334,9 @@ class PTransformTest(unittest.TestCase): return (context.element % 3) + offset pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) + pcoll = pipeline | 'Start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) # Attempt nominal partition operation. - partitions = pcoll | 'part1' >> beam.Partition(SomePartitionFn(), 4, 1) + partitions = pcoll | 'Part 1' >> beam.Partition(SomePartitionFn(), 4, 1) assert_that(partitions[0], equal_to([])) assert_that(partitions[1], equal_to([0, 3, 6]), label='p1') assert_that(partitions[2], equal_to([1, 4, 7]), label='p2') @@ -348,14 +346,14 @@ class PTransformTest(unittest.TestCase): # Check that a bad partition label will yield an error. For the # DirectRunner, this error manifests as an exception. pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) - partitions = pcoll | 'part2' >> beam.Partition(SomePartitionFn(), 4, 10000) + pcoll = pipeline | 'Start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) + partitions = pcoll | 'Part 2' >> beam.Partition(SomePartitionFn(), 4, 10000) with self.assertRaises(ValueError): pipeline.run() def test_partition_with_callable(self): pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) + pcoll = pipeline | 'Start' >> beam.Create([0, 1, 2, 3, 4, 5, 6, 7, 8]) partitions = ( pcoll | beam.Partition( 'part', @@ -380,48 +378,47 @@ class PTransformTest(unittest.TestCase): def test_flatten_pcollections(self): pipeline = TestPipeline() - pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3]) - pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7]) - result = (pcoll_1, pcoll_2) | 'flatten' >> beam.Flatten() + pcoll_1 = pipeline | 'Start 1' >> beam.Create([0, 1, 2, 3]) + pcoll_2 = pipeline | 'Start 2' >> beam.Create([4, 5, 6, 7]) + result = (pcoll_1, pcoll_2) | 'Flatten' >> beam.Flatten() assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7])) pipeline.run() def test_flatten_no_pcollections(self): pipeline = TestPipeline() with self.assertRaises(ValueError): - () | 'pipeline arg missing' >> beam.Flatten() - result = () | 'empty' >> beam.Flatten(pipeline=pipeline) + () | 'PipelineArgMissing' >> beam.Flatten() + result = () | 'Empty' >> beam.Flatten(pipeline=pipeline) assert_that(result, equal_to([])) pipeline.run() def test_flatten_pcollections_in_iterable(self): pipeline = TestPipeline() - pcoll_1 = pipeline | 'start_1' >> beam.Create([0, 1, 2, 3]) - pcoll_2 = pipeline | 'start_2' >> beam.Create([4, 5, 6, 7]) - result = ([pcoll for pcoll in (pcoll_1, pcoll_2)] - | 'flatten' >> beam.Flatten()) + pcoll_1 = pipeline | 'Start 1' >> beam.Create([0, 1, 2, 3]) + pcoll_2 = pipeline | 'Start 2' >> beam.Create([4, 5, 6, 7]) + result = [pcoll for pcoll in (pcoll_1, pcoll_2)] | beam.Flatten() assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7])) pipeline.run() def test_flatten_input_type_must_be_iterable(self): # Inputs to flatten *must* be an iterable. with self.assertRaises(ValueError): - 4 | 'flatten' >> beam.Flatten() + 4 | beam.Flatten() def test_flatten_input_type_must_be_iterable_of_pcolls(self): # Inputs to flatten *must* be an iterable of PCollections. with self.assertRaises(TypeError): - {'l': 'test'} | 'flatten' >> beam.Flatten() + {'l': 'test'} | beam.Flatten() with self.assertRaises(TypeError): - set([1, 2, 3]) | 'flatten' >> beam.Flatten() + set([1, 2, 3]) | beam.Flatten() def test_co_group_by_key_on_list(self): pipeline = TestPipeline() - pcoll_1 = pipeline | beam.Create( - 'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) - pcoll_2 = pipeline | beam.Create( - 'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)]) - result = (pcoll_1, pcoll_2) | 'cgbk' >> beam.CoGroupByKey() + pcoll_1 = pipeline | 'Start 1' >> beam.Create( + [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) + pcoll_2 = pipeline | 'Start 2' >> beam.Create( + [('a', 5), ('a', 6), ('c', 7), ('c', 8)]) + result = (pcoll_1, pcoll_2) | beam.CoGroupByKey() assert_that(result, equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])), ('c', ([4], [7, 8]))])) @@ -429,12 +426,11 @@ class PTransformTest(unittest.TestCase): def test_co_group_by_key_on_iterable(self): pipeline = TestPipeline() - pcoll_1 = pipeline | beam.Create( - 'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) - pcoll_2 = pipeline | beam.Create( - 'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)]) - result = ([pc for pc in (pcoll_1, pcoll_2)] - | 'cgbk' >> beam.CoGroupByKey()) + pcoll_1 = pipeline | 'Start 1' >> beam.Create( + [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) + pcoll_2 = pipeline | 'Start 2' >> beam.Create( + [('a', 5), ('a', 6), ('c', 7), ('c', 8)]) + result = [pc for pc in (pcoll_1, pcoll_2)] | beam.CoGroupByKey() assert_that(result, equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])), ('c', ([4], [7, 8]))])) @@ -442,11 +438,11 @@ class PTransformTest(unittest.TestCase): def test_co_group_by_key_on_dict(self): pipeline = TestPipeline() - pcoll_1 = pipeline | beam.Create( - 'start_1', [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) - pcoll_2 = pipeline | beam.Create( - 'start_2', [('a', 5), ('a', 6), ('c', 7), ('c', 8)]) - result = {'X': pcoll_1, 'Y': pcoll_2} | 'cgbk' >> beam.CoGroupByKey() + pcoll_1 = pipeline | 'Start 1' >> beam.Create( + [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) + pcoll_2 = pipeline | 'Start 2' >> beam.Create( + [('a', 5), ('a', 6), ('c', 7), ('c', 8)]) + result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey() assert_that(result, equal_to([('a', {'X': [1, 2], 'Y': [5, 6]}), ('b', {'X': [3], 'Y': []}), ('c', {'X': [4], 'Y': [7, 8]})])) @@ -478,8 +474,8 @@ class PTransformTest(unittest.TestCase): def test_keys_and_values(self): pipeline = TestPipeline() - pcoll = pipeline | beam.Create( - 'start', [(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)]) + pcoll = pipeline | 'Start' >> beam.Create( + [(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)]) keys = pcoll.apply('keys', beam.Keys()) vals = pcoll.apply('vals', beam.Values()) assert_that(keys, equal_to([1, 2, 2, 3, 3, 3]), label='assert:keys') @@ -488,16 +484,16 @@ class PTransformTest(unittest.TestCase): def test_kv_swap(self): pipeline = TestPipeline() - pcoll = pipeline | beam.Create( - 'start', [(6, 3), (1, 2), (7, 1), (5, 2), (3, 2)]) + pcoll = pipeline | 'Start' >> beam.Create( + [(6, 3), (1, 2), (7, 1), (5, 2), (3, 2)]) result = pcoll.apply('swap', beam.KvSwap()) assert_that(result, equal_to([(1, 7), (2, 1), (2, 3), (2, 5), (3, 6)])) pipeline.run() def test_remove_duplicates(self): pipeline = TestPipeline() - pcoll = pipeline | beam.Create( - 'start', [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel']) + pcoll = pipeline | 'Start' >> beam.Create( + [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel']) result = pcoll.apply('nodupes', beam.RemoveDuplicates()) assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel'])) pipeline.run() @@ -507,15 +503,15 @@ class PTransformTest(unittest.TestCase): t = (beam.Map(lambda x: (x, 1)) | beam.GroupByKey() | beam.Map(lambda (x, ones): (x, sum(ones)))) - result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t + result = pipeline | 'Start' >> beam.Create(['a', 'a', 'b']) | t assert_that(result, equal_to([('a', 2), ('b', 1)])) pipeline.run() def test_apply_to_list(self): self.assertItemsEqual( - [1, 2, 3], [0, 1, 2] | 'add_one' >> beam.Map(lambda x: x + 1)) + [1, 2, 3], [0, 1, 2] | 'AddOne' >> beam.Map(lambda x: x + 1)) self.assertItemsEqual([1], - [0, 1, 2] | 'odd' >> beam.Filter(lambda x: x % 2)) + [0, 1, 2] | 'Odd' >> beam.Filter(lambda x: x % 2)) self.assertItemsEqual([1, 2, 100, 3], ([1, 2, 3], [100]) | beam.Flatten()) join_input = ([('k', 'a')], @@ -575,47 +571,47 @@ class PTransformLabelsTest(unittest.TestCase): pardo = None def expand(self, pcoll): - self.pardo = '*do*' >> beam.FlatMap(lambda x: [x + 1]) + self.pardo = '*Do*' >> beam.FlatMap(lambda x: [x + 1]) return pcoll | self.pardo def test_chained_ptransforms(self): """Tests that chaining gets proper nesting.""" pipeline = TestPipeline() - map1 = 'map1' >> beam.Map(lambda x: (x, 1)) - gbk = 'gbk' >> beam.GroupByKey() - map2 = 'map2' >> beam.Map(lambda (x, ones): (x, sum(ones))) + map1 = 'Map1' >> beam.Map(lambda x: (x, 1)) + gbk = 'Gbk' >> beam.GroupByKey() + map2 = 'Map2' >> beam.Map(lambda (x, ones): (x, sum(ones))) t = (map1 | gbk | map2) - result = pipeline | 'start' >> beam.Create(['a', 'a', 'b']) | t - self.assertTrue('map1|gbk|map2/map1' in pipeline.applied_labels) - self.assertTrue('map1|gbk|map2/gbk' in pipeline.applied_labels) - self.assertTrue('map1|gbk|map2/map2' in pipeline.applied_labels) + result = pipeline | 'Start' >> beam.Create(['a', 'a', 'b']) | t + self.assertTrue('Map1|Gbk|Map2/Map1' in pipeline.applied_labels) + self.assertTrue('Map1|Gbk|Map2/Gbk' in pipeline.applied_labels) + self.assertTrue('Map1|Gbk|Map2/Map2' in pipeline.applied_labels) assert_that(result, equal_to([('a', 2), ('b', 1)])) pipeline.run() def test_apply_custom_transform_without_label(self): pipeline = TestPipeline() - pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3]) + pcoll = pipeline | 'PColl' >> beam.Create([1, 2, 3]) custom = PTransformLabelsTest.CustomTransform() result = pipeline.apply(custom, pcoll) self.assertTrue('CustomTransform' in pipeline.applied_labels) - self.assertTrue('CustomTransform/*do*' in pipeline.applied_labels) + self.assertTrue('CustomTransform/*Do*' in pipeline.applied_labels) assert_that(result, equal_to([2, 3, 4])) pipeline.run() def test_apply_custom_transform_with_label(self): pipeline = TestPipeline() - pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3]) - custom = PTransformLabelsTest.CustomTransform('*custom*') + pcoll = pipeline | 'PColl' >> beam.Create([1, 2, 3]) + custom = PTransformLabelsTest.CustomTransform('*Custom*') result = pipeline.apply(custom, pcoll) - self.assertTrue('*custom*' in pipeline.applied_labels) - self.assertTrue('*custom*/*do*' in pipeline.applied_labels) + self.assertTrue('*Custom*' in pipeline.applied_labels) + self.assertTrue('*Custom*/*Do*' in pipeline.applied_labels) assert_that(result, equal_to([2, 3, 4])) pipeline.run() def test_combine_without_label(self): vals = [1, 2, 3, 4, 5, 6, 7] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(vals) + pcoll = pipeline | 'Start' >> beam.Create(vals) combine = beam.CombineGlobally(sum) result = pcoll | combine self.assertTrue('CombineGlobally(sum)' in pipeline.applied_labels) @@ -624,28 +620,28 @@ class PTransformLabelsTest(unittest.TestCase): def test_apply_ptransform_using_decorator(self): pipeline = TestPipeline() - pcoll = pipeline | 'pcoll' >> beam.Create([1, 2, 3]) - sample = SamplePTransform('*sample*') + pcoll = pipeline | 'PColl' >> beam.Create([1, 2, 3]) + sample = SamplePTransform('*Sample*') _ = pcoll | sample - self.assertTrue('*sample*' in pipeline.applied_labels) - self.assertTrue('*sample*/ToPairs' in pipeline.applied_labels) - self.assertTrue('*sample*/Group' in pipeline.applied_labels) - self.assertTrue('*sample*/RemoveDuplicates' in pipeline.applied_labels) + self.assertTrue('*Sample*' in pipeline.applied_labels) + self.assertTrue('*Sample*/ToPairs' in pipeline.applied_labels) + self.assertTrue('*Sample*/Group' in pipeline.applied_labels) + self.assertTrue('*Sample*/RemoveDuplicates' in pipeline.applied_labels) def test_combine_with_label(self): vals = [1, 2, 3, 4, 5, 6, 7] pipeline = TestPipeline() - pcoll = pipeline | 'start' >> beam.Create(vals) - combine = '*sum*' >> beam.CombineGlobally(sum) + pcoll = pipeline | 'Start' >> beam.Create(vals) + combine = '*Sum*' >> beam.CombineGlobally(sum) result = pcoll | combine - self.assertTrue('*sum*' in pipeline.applied_labels) + self.assertTrue('*Sum*' in pipeline.applied_labels) assert_that(result, equal_to([sum(vals)])) pipeline.run() def check_label(self, ptransform, expected_label): pipeline = TestPipeline() - pipeline | 'start' >> beam.Create([('a', 1)]) | ptransform - actual_label = sorted(pipeline.applied_labels - {'start'})[0] + pipeline | 'Start' >> beam.Create([('a', 1)]) | ptransform + actual_label = sorted(pipeline.applied_labels - {'Start'})[0] self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label)) def test_default_labels(self): @@ -737,8 +733,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): return [context.element + five] d = (self.p - | 't' >> beam.Create([1, 2, 3]).with_output_types(int) - | 'add' >> beam.ParDo(AddWithFive(), 5)) + | 'T' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'Add' >> beam.ParDo(AddWithFive(), 5)) assert_that(d, equal_to([6, 7, 8])) self.p.run() @@ -752,10 +748,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 't' >> beam.Create([1, 2, 3]).with_output_types(int) - | 'upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello')) + | 'T' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'Upper' >> beam.ParDo(ToUpperCaseWithPrefix(), 'hello')) - self.assertEqual("Type hint violation for 'upper': " + self.assertEqual("Type hint violation for 'Upper': " "requires <type 'str'> but got <type 'int'> for context", e.exception.message) @@ -769,8 +765,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): return [context.element + num] d = (self.p - | 't' >> beam.Create([1, 2, 3]).with_output_types(int) - | 'add' >> beam.ParDo(AddWithNum(), 5)) + | 'T' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'Add' >> beam.ParDo(AddWithNum(), 5)) assert_that(d, equal_to([6, 7, 8])) self.p.run() @@ -786,11 +782,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 't' >> beam.Create(['1', '2', '3']).with_output_types(str) - | 'add' >> beam.ParDo(AddWithNum(), 5)) + | 'T' >> beam.Create(['1', '2', '3']).with_output_types(str) + | 'Add' >> beam.ParDo(AddWithNum(), 5)) self.p.run() - self.assertEqual("Type hint violation for 'add': " + self.assertEqual("Type hint violation for 'Add': " "requires <type 'int'> but got <type 'str'> for context", e.exception.message) @@ -804,10 +800,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # will receive a str instead, which should result in a raised exception. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 's' >> beam.Create(['b', 'a', 'r']).with_output_types(str) - | 'to str' >> beam.FlatMap(int_to_str)) + | 'S' >> beam.Create(['b', 'a', 'r']).with_output_types(str) + | 'ToStr' >> beam.FlatMap(int_to_str)) - self.assertEqual("Type hint violation for 'to str': " + self.assertEqual("Type hint violation for 'ToStr': " "requires <type 'int'> but got <type 'str'> for a", e.exception.message) @@ -819,8 +815,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # If this type-checks than no error should be raised. d = (self.p - | 't' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) - | 'case' >> beam.FlatMap(to_all_upper_case)) + | 'T' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | 'Case' >> beam.FlatMap(to_all_upper_case)) assert_that(d, equal_to(['T', 'E', 'S', 'T'])) self.p.run() @@ -833,23 +829,23 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # expecting pcoll's of type str instead. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) - | ('score' >> beam.FlatMap(lambda x: [1] if x == 't' else [2]) + | 'S' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | ('Score' >> beam.FlatMap(lambda x: [1] if x == 't' else [2]) .with_input_types(str).with_output_types(int)) - | ('upper' >> beam.FlatMap(lambda x: [x.upper()]) + | ('Upper' >> beam.FlatMap(lambda x: [x.upper()]) .with_input_types(str).with_output_types(str))) - self.assertEqual("Type hint violation for 'upper': " + self.assertEqual("Type hint violation for 'Upper': " "requires <type 'str'> but got <type 'int'> for x", e.exception.message) def test_pardo_properly_type_checks_using_type_hint_methods(self): # Pipeline should be created successfully without an error d = (self.p - | 's' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) - | 'dup' >> beam.FlatMap(lambda x: [x + x]) + | 'S' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | 'Dup' >> beam.FlatMap(lambda x: [x + x]) .with_input_types(str).with_output_types(str) - | 'upper' >> beam.FlatMap(lambda x: [x.upper()]) + | 'Upper' >> beam.FlatMap(lambda x: [x.upper()]) .with_input_types(str).with_output_types(str)) assert_that(d, equal_to(['TT', 'EE', 'SS', 'TT'])) @@ -860,19 +856,19 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # int's, while Map is expecting one of str. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int) - | 'upper' >> beam.Map(lambda x: x.upper()) + | 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int) + | 'Upper' >> beam.Map(lambda x: x.upper()) .with_input_types(str).with_output_types(str)) - self.assertEqual("Type hint violation for 'upper': " + self.assertEqual("Type hint violation for 'Upper': " "requires <type 'str'> but got <type 'int'> for x", e.exception.message) def test_map_properly_type_checks_using_type_hints_methods(self): # No error should be raised if this type-checks properly. d = (self.p - | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int) - | 'to_str' >> beam.Map(lambda x: str(x)) + | 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int) + | 'ToStr' >> beam.Map(lambda x: str(x)) .with_input_types(int).with_output_types(str)) assert_that(d, equal_to(['1', '2', '3', '4'])) self.p.run() @@ -887,10 +883,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # However, 'Map' should detect that Create has hinted an int instead. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 's' >> beam.Create([1, 2, 3, 4]).with_output_types(int) - | 'upper' >> beam.Map(upper)) + | 'S' >> beam.Create([1, 2, 3, 4]).with_output_types(int) + | 'Upper' >> beam.Map(upper)) - self.assertEqual("Type hint violation for 'upper': " + self.assertEqual("Type hint violation for 'Upper': " "requires <type 'str'> but got <type 'int'> for s", e.exception.message) @@ -912,12 +908,12 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # incoming. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str) - | 'lower' >> beam.Map(lambda x: x.lower()) + | 'Strs' >> beam.Create(['1', '2', '3', '4', '5']).with_output_types(str) + | 'Lower' >> beam.Map(lambda x: x.lower()) .with_input_types(str).with_output_types(str) - | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int)) + | 'Below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int)) - self.assertEqual("Type hint violation for 'below 3': " + self.assertEqual("Type hint violation for 'Below 3': " "requires <type 'int'> but got <type 'str'> for x", e.exception.message) @@ -925,9 +921,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # No error should be raised if this type-checks properly. d = (self.p | beam.Create(['1', '2', '3', '4', '5']).with_output_types(str) - | 'to int' >> beam.Map(lambda x: int(x)) + | 'ToInt' >> beam.Map(lambda x: int(x)) .with_input_types(str).with_output_types(int) - | 'below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int)) + | 'Below 3' >> beam.Filter(lambda x: x < 3).with_input_types(int)) assert_that(d, equal_to([1, 2])) self.p.run() @@ -939,10 +935,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # Func above was hinted to only take a float, yet an int will be passed. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'ints' >> beam.Create([1, 2, 3, 4]).with_output_types(int) - | 'half' >> beam.Filter(more_than_half)) + | 'Ints' >> beam.Create([1, 2, 3, 4]).with_output_types(int) + | 'Half' >> beam.Filter(more_than_half)) - self.assertEqual("Type hint violation for 'half': " + self.assertEqual("Type hint violation for 'Half': " "requires <type 'float'> but got <type 'int'> for a", e.exception.message) @@ -954,15 +950,15 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # Filter should deduce that it returns the same type that it takes. (self.p - | 'str' >> beam.Create(range(5)).with_output_types(int) - | 'half' >> beam.Filter(half) - | 'to bool' >> beam.Map(lambda x: bool(x)) + | 'Str' >> beam.Create(range(5)).with_output_types(int) + | 'Half' >> beam.Filter(half) + | 'ToBool' >> beam.Map(lambda x: bool(x)) .with_input_types(int).with_output_types(bool)) def test_group_by_key_only_output_type_deduction(self): d = (self.p - | 'str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) - | ('pair' >> beam.Map(lambda x: (x, ord(x))) + | 'Str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | ('Pair' >> beam.Map(lambda x: (x, ord(x))) .with_output_types(typehints.KV[str, str])) | beam.GroupByKeyOnly()) @@ -973,8 +969,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_group_by_key_output_type_deduction(self): d = (self.p - | 'str' >> beam.Create(range(20)).with_output_types(int) - | ('pair negative' >> beam.Map(lambda x: (x % 5, -x)) + | 'Str' >> beam.Create(range(20)).with_output_types(int) + | ('PairNegative' >> beam.Map(lambda x: (x % 5, -x)) .with_output_types(typehints.KV[int, int])) | beam.GroupByKey()) @@ -1016,11 +1012,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # information to the ParDo. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'nums' >> beam.Create(range(5)) - | 'mod dup' >> beam.FlatMap(lambda x: (x % 2, x))) + | 'Nums' >> beam.Create(range(5)) + | 'ModDup' >> beam.FlatMap(lambda x: (x % 2, x))) self.assertEqual('Pipeline type checking is enabled, however no output ' - 'type-hint was found for the PTransform Create(nums)', + 'type-hint was found for the PTransform Create(Nums)', e.exception.message) def test_pipeline_checking_gbk_insufficient_type_information(self): @@ -1029,13 +1025,13 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # information to GBK-only. with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'nums' >> beam.Create(range(5)).with_output_types(int) - | 'mod dup' >> beam.Map(lambda x: (x % 2, x)) + | 'Nums' >> beam.Create(range(5)).with_output_types(int) + | 'ModDup' >> beam.Map(lambda x: (x % 2, x)) | beam.GroupByKeyOnly()) self.assertEqual('Pipeline type checking is enabled, however no output ' 'type-hint was found for the PTransform ' - 'ParDo(mod dup)', + 'ParDo(ModDup)', e.exception.message) def test_disable_pipeline_type_check(self): @@ -1044,8 +1040,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # The pipeline below should raise a TypeError, however pipeline type # checking was disabled above. (self.p - | 't' >> beam.Create([1, 2, 3]).with_output_types(int) - | 'lower' >> beam.Map(lambda x: x.lower()) + | 'T' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'Lower' >> beam.Map(lambda x: x.lower()) .with_input_types(str).with_output_types(str)) def test_run_time_type_checking_enabled_type_violation(self): @@ -1060,14 +1056,14 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # Function above has been type-hinted to only accept an int. But in the # pipeline execution it'll be passed a string due to the output of Create. (self.p - | 't' >> beam.Create(['some_string']) - | 'to str' >> beam.Map(int_to_string)) + | 'T' >> beam.Create(['some_string']) + | 'ToStr' >> beam.Map(int_to_string)) with self.assertRaises(typehints.TypeCheckError) as e: self.p.run() self.assertStartswith( e.exception.message, - "Runtime type violation detected within ParDo(to str): " + "Runtime type violation detected within ParDo(ToStr): " "Type-hint for argument: 'x' violated. " "Expected an instance of <type 'int'>, " "instead found some_string, an instance of <type 'str'>.") @@ -1084,9 +1080,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # Pipeline checking is off, but the above function should satisfy types at # run-time. result = (self.p - | 't' >> beam.Create(['t', 'e', 's', 't', 'i', 'n', 'g']) + | 'T' >> beam.Create(['t', 'e', 's', 't', 'i', 'n', 'g']) .with_output_types(str) - | 'gen keys' >> beam.Map(group_with_upper_ord) + | 'GenKeys' >> beam.Map(group_with_upper_ord) | 'O' >> beam.GroupByKey()) assert_that(result, equal_to([(1, ['g']), @@ -1106,9 +1102,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): return (a % 2, a) (self.p - | 'nums' >> beam.Create(range(5)).with_output_types(int) - | 'is even' >> beam.Map(is_even_as_key) - | 'parity' >> beam.GroupByKey()) + | 'Nums' >> beam.Create(range(5)).with_output_types(int) + | 'IsEven' >> beam.Map(is_even_as_key) + | 'Parity' >> beam.GroupByKey()) # Although all the types appear to be correct when checked at pipeline # construction. Runtime type-checking should detect the 'is_even_as_key' is @@ -1118,7 +1114,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.assertStartswith( e.exception.message, - "Runtime type violation detected within ParDo(is even): " + "Runtime type violation detected within ParDo(IsEven): " "Tuple[bool, int] hint type-constraint violated. " "The type of element #0 in the passed tuple is incorrect. " "Expected an instance of type bool, " @@ -1135,9 +1131,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): return (a % 2 == 0, a) result = (self.p - | 'nums' >> beam.Create(range(5)).with_output_types(int) - | 'is even' >> beam.Map(is_even_as_key) - | 'parity' >> beam.GroupByKey()) + | 'Nums' >> beam.Create(range(5)).with_output_types(int) + | 'IsEven' >> beam.Map(is_even_as_key) + | 'Parity' >> beam.GroupByKey()) assert_that(result, equal_to([(False, [1, 3]), (True, [0, 2, 4])])) self.p.run() @@ -1152,13 +1148,13 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create([1, 2, 3]) - | ('to int' >> beam.FlatMap(lambda x: [int(x)]) + | ('ToInt' >> beam.FlatMap(lambda x: [int(x)]) .with_input_types(str).with_output_types(int))) self.p.run() self.assertStartswith( e.exception.message, - "Runtime type violation detected within ParDo(to int): " + "Runtime type violation detected within ParDo(ToInt): " "Type-hint for argument: 'x' violated. " "Expected an instance of <type 'str'>, " "instead found 1, an instance of <type 'int'>.") @@ -1170,14 +1166,14 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)]) - | ('add' >> beam.FlatMap(lambda (x, y): [x + y]) + | ('Add' >> beam.FlatMap(lambda (x, y): [x + y]) .with_input_types(typehints.Tuple[int, int]).with_output_types(int)) ) self.p.run() self.assertStartswith( e.exception.message, - "Runtime type violation detected within ParDo(add): " + "Runtime type violation detected within ParDo(Add): " "Type-hint for argument: 'y' violated. " "Expected an instance of <type 'int'>, " "instead found 3.0, an instance of <type 'float'>.") @@ -1189,14 +1185,13 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): # The type-hinted applied via the 'returns()' method indicates the ParDo # should output an instance of type 'int', however a 'float' will be # generated instead. - print "HINTS", beam.FlatMap( - 'to int', + print "HINTS", ('ToInt' >> beam.FlatMap( lambda x: [float(x)]).with_input_types(int).with_output_types( - int).get_type_hints() + int)).get_type_hints() with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create([1, 2, 3]) - | ('to int' >> beam.FlatMap(lambda x: [float(x)]) + | ('ToInt' >> beam.FlatMap(lambda x: [float(x)]) .with_input_types(int).with_output_types(int)) ) self.p.run() @@ -1204,7 +1199,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.assertStartswith( e.exception.message, "Runtime type violation detected within " - "ParDo(to int): " + "ParDo(ToInt): " "According to type-hint expected output should be " "of type <type 'int'>. Instead, received '1.0', " "an instance of type <type 'float'>.") @@ -1219,7 +1214,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create([(1, 3.0), (2, 4.9), (3, 9.5)]) - | ('swap' >> beam.FlatMap(lambda (x, y): [x + y]) + | ('Swap' >> beam.FlatMap(lambda (x, y): [x + y]) .with_input_types(typehints.Tuple[int, float]) .with_output_types(typehints.Tuple[float, int])) ) @@ -1228,7 +1223,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.assertStartswith( e.exception.message, "Runtime type violation detected within " - "ParDo(swap): Tuple type constraint violated. " + "ParDo(Swap): Tuple type constraint violated. " "Valid object instance must be of type 'tuple'. Instead, " "an instance of 'float' was received.") @@ -1242,12 +1237,12 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): return a + b with self.assertRaises(typehints.TypeCheckError) as e: - (self.p | beam.Create([1, 2, 3, 4]) | 'add 1' >> beam.Map(add, 1.0)) + (self.p | beam.Create([1, 2, 3, 4]) | 'Add 1' >> beam.Map(add, 1.0)) self.p.run() self.assertStartswith( e.exception.message, - "Runtime type violation detected within ParDo(add 1): " + "Runtime type violation detected within ParDo(Add 1): " "Type-hint for argument: 'b' violated. " "Expected an instance of <type 'int'>, " "instead found 1.0, an instance of <type 'float'>.") @@ -1259,14 +1254,14 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create([1, 2, 3, 4]) - | ('add 1' >> beam.Map(lambda x, one: x + one, 1.0) + | ('Add 1' >> beam.Map(lambda x, one: x + one, 1.0) .with_input_types(int, int) .with_output_types(float))) self.p.run() self.assertStartswith( e.exception.message, - "Runtime type violation detected within ParDo(add 1): " + "Runtime type violation detected within ParDo(Add 1): " "Type-hint for argument: 'one' violated. " "Expected an instance of <type 'int'>, " "instead found 1.0, an instance of <type 'float'>.") @@ -1278,8 +1273,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): return sum(ints) d = (self.p - | 't' >> beam.Create([1, 2, 3]).with_output_types(int) - | 'sum' >> beam.CombineGlobally(sum_ints)) + | 'T' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'Sum' >> beam.CombineGlobally(sum_ints)) self.assertEqual(int, d.element_type) assert_that(d, equal_to([6])) @@ -1293,8 +1288,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'm' >> beam.Create([1, 2, 3]).with_output_types(int) - | 'add' >> beam.CombineGlobally(bad_combine)) + | 'M' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'Add' >> beam.CombineGlobally(bad_combine)) self.assertEqual( "All functions for a Combine PTransform must accept a " @@ -1314,9 +1309,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): return list(range(n+1)) d = (self.p - | 't' >> beam.Create([1, 2, 3]).with_output_types(int) - | 'sum' >> beam.CombineGlobally(sum_ints) - | 'range' >> beam.ParDo(range_from_zero)) + | 'T' >> beam.Create([1, 2, 3]).with_output_types(int) + | 'Sum' >> beam.CombineGlobally(sum_ints) + | 'Range' >> beam.ParDo(range_from_zero)) self.assertEqual(int, d.element_type) assert_that(d, equal_to([0, 1, 2, 3, 4, 5, 6])) @@ -1331,8 +1326,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): return reduce(operator.mul, ints, 1) d = (self.p - | 'k' >> beam.Create([5, 5, 5, 5]).with_output_types(int) - | 'mul' >> beam.CombineGlobally(iter_mul)) + | 'K' >> beam.Create([5, 5, 5, 5]).with_output_types(int) + | 'Mul' >> beam.CombineGlobally(iter_mul)) assert_that(d, equal_to([625])) self.p.run() @@ -1349,14 +1344,14 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'k' >> beam.Create([5, 5, 5, 5]).with_output_types(int) - | 'mul' >> beam.CombineGlobally(iter_mul)) + | 'K' >> beam.Create([5, 5, 5, 5]).with_output_types(int) + | 'Mul' >> beam.CombineGlobally(iter_mul)) self.p.run() self.assertStartswith( e.exception.message, "Runtime type violation detected within " - "ParDo(mul/CombinePerKey/Combine/ParDo(CombineValuesDoFn)): " + "ParDo(Mul/CombinePerKey/Combine/ParDo(CombineValuesDoFn)): " "Tuple[TypeVariable[K], int] hint type-constraint violated. " "The type of element #1 in the passed tuple is incorrect. " "Expected an instance of type int, " @@ -1381,7 +1376,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): d = (self.p | beam.Create(range(5)).with_output_types(int) - | ('sum' >> beam.CombineGlobally(lambda s: sum(s)) + | ('Sum' >> beam.CombineGlobally(lambda s: sum(s)) .with_input_types(int).with_output_types(int))) assert_that(d, equal_to([10])) @@ -1391,10 +1386,10 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create(range(3)).with_output_types(int) - | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s))) + | ('SortJoin' >> beam.CombineGlobally(lambda s: ''.join(sorted(s))) .with_input_types(str).with_output_types(str))) - self.assertEqual("Input type hint violation at sort join: " + self.assertEqual("Input type hint violation at SortJoin: " "expected <type 'str'>, got <type 'int'>", e.exception.message) @@ -1405,14 +1400,14 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create(range(3)).with_output_types(int) - | ('sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s))) + | ('SortJoin' >> beam.CombineGlobally(lambda s: ''.join(sorted(s))) .with_input_types(str).with_output_types(str))) self.p.run() self.assertStartswith( e.exception.message, "Runtime type violation detected within " - "ParDo(sort join/KeyWithVoid): " + "ParDo(SortJoin/KeyWithVoid): " "Type-hint for argument: 'v' violated. " "Expected an instance of <type 'str'>, " "instead found 0, an instance of <type 'int'>.") @@ -1422,20 +1417,20 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'e' >> beam.Create(range(3)).with_output_types(int) - | 'sort join' >> beam.CombineGlobally(lambda s: ''.join(sorted(s))) - | 'f' >> beam.Map(lambda x: x + 1)) + | 'E' >> beam.Create(range(3)).with_output_types(int) + | 'SortJoin' >> beam.CombineGlobally(lambda s: ''.join(sorted(s))) + | 'F' >> beam.Map(lambda x: x + 1)) self.assertEqual( 'Pipeline type checking is enabled, ' 'however no output type-hint was found for the PTransform ' - 'ParDo(sort join/CombinePerKey/Combine/ParDo(CombineValuesDoFn))', + 'ParDo(SortJoin/CombinePerKey/Combine/ParDo(CombineValuesDoFn))', e.exception.message) def test_mean_globally_pipeline_checking_satisfied(self): d = (self.p - | 'c' >> beam.Create(range(5)).with_output_types(int) - | 'mean' >> combine.Mean.Globally()) + | 'C' >> beam.Create(range(5)).with_output_types(int) + | 'Mean' >> combine.Mean.Globally()) self.assertTrue(d.element_type is float) assert_that(d, equal_to([2.0])) @@ -1444,8 +1439,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_mean_globally_pipeline_checking_violated(self): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'c' >> beam.Create(['test']).with_output_types(str) - | 'mean' >> combine.Mean.Globally()) + | 'C' >> beam.Create(['test']).with_output_types(str) + | 'Mean' >> combine.Mean.Globally()) self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': " "requires Tuple[TypeVariable[K], " @@ -1457,8 +1452,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | 'c' >> beam.Create(range(5)).with_output_types(int) - | 'mean' >> combine.Mean.Globally()) + | 'C' >> beam.Create(range(5)).with_output_types(int) + | 'Mean' >> combine.Mean.Globally()) self.assertTrue(d.element_type is float) assert_that(d, equal_to([2.0])) @@ -1470,8 +1465,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | 'c' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) - | 'mean' >> combine.Mean.Globally()) + | 'C' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) + | 'Mean' >> combine.Mean.Globally()) self.p.run() self.assertEqual("Runtime type violation detected for transform input " "when executing ParDoFlatMap(Combine): Tuple[Any, " @@ -1487,9 +1482,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_mean_per_key_pipeline_checking_satisfied(self): d = (self.p | beam.Create(range(5)).with_output_types(int) - | ('even group' >> beam.Map(lambda x: (not x % 2, x)) + | ('EvenGroup' >> beam.Map(lambda x: (not x % 2, x)) .with_output_types(typehints.KV[bool, int])) - | 'even mean' >> combine.Mean.PerKey()) + | 'EvenMean' >> combine.Mean.PerKey()) self.assertCompatible(typehints.KV[bool, float], d.element_type) assert_that(d, equal_to([(False, 2.0), (True, 2.0)])) @@ -1499,9 +1494,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create(map(str, range(5))).with_output_types(str) - | ('upper pair' >> beam.Map(lambda x: (x.upper(), x)) + | ('UpperPair' >> beam.Map(lambda x: (x.upper(), x)) .with_output_types(typehints.KV[str, str])) - | 'even mean' >> combine.Mean.PerKey()) + | 'EvenMean' >> combine.Mean.PerKey()) self.p.run() self.assertEqual("Type hint violation for 'ParDo(CombineValuesDoFn)': " @@ -1515,9 +1510,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): d = (self.p | beam.Create(range(5)).with_output_types(int) - | ('odd group' >> beam.Map(lambda x: (bool(x % 2), x)) + | ('OddGroup' >> beam.Map(lambda x: (bool(x % 2), x)) .with_output_types(typehints.KV[bool, int])) - | 'odd mean' >> combine.Mean.PerKey()) + | 'OddMean' >> combine.Mean.PerKey()) self.assertCompatible(typehints.KV[bool, float], d.element_type) assert_that(d, equal_to([(False, 2.0), (True, 2.0)])) @@ -1530,15 +1525,15 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create(range(5)).with_output_types(int) - | ('odd group' >> beam.Map(lambda x: (x, str(bool(x % 2)))) + | ('OddGroup' >> beam.Map(lambda x: (x, str(bool(x % 2)))) .with_output_types(typehints.KV[int, str])) - | 'odd mean' >> combine.Mean.PerKey()) + | 'OddMean' >> combine.Mean.PerKey()) self.p.run() self.assertStartswith( e.exception.message, "Runtime type violation detected within " - "ParDo(odd mean/CombinePerKey(MeanCombineFn)/" + "ParDo(OddMean/CombinePerKey(MeanCombineFn)/" "Combine/ParDo(CombineValuesDoFn)): " "Type-hint for argument: 'p_context' violated: " "Tuple[TypeVariable[K], Iterable[Union[float, int, long]]]" @@ -1553,8 +1548,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_count_globally_pipeline_type_checking_satisfied(self): d = (self.p - | 'p' >> beam.Create(range(5)).with_output_types(int) - | 'count int' >> combine.Count.Globally()) + | 'P' >> beam.Create(range(5)).with_output_types(int) + | 'CountInt' >> combine.Count.Globally()) self.assertTrue(d.element_type is int) assert_that(d, equal_to([5])) @@ -1564,8 +1559,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.options.view_as(TypeOptions).runtime_type_check = True d = (self.p - | 'p' >> beam.Create(range(5)).with_output_types(int) - | 'count int' >> combine.Count.Globally()) + | 'P' >> beam.Create(range(5)).with_output_types(int) + | 'CountInt' >> combine.Count.Globally()) self.assertTrue(d.element_type is int) assert_that(d, equal_to([5])) @@ -1574,9 +1569,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_count_perkey_pipeline_type_checking_satisfied(self): d = (self.p | beam.Create(range(5)).with_output_types(int) - | ('even group' >> beam.Map(lambda x: (not x % 2, x)) + | ('EvenGroup' >> beam.Map(lambda x: (not x % 2, x)) .with_output_types(typehints.KV[bool, int])) - | 'count int' >> combine.Count.PerKey()) + | 'CountInt' >> combine.Count.PerKey()) self.assertCompatible(typehints.KV[bool, int], d.element_type) assert_that(d, equal_to([(False, 2), (True, 3)])) @@ -1586,7 +1581,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create(range(5)).with_output_types(int) - | 'count int' >> combine.Count.PerKey()) + | 'CountInt' >> combine.Count.PerKey()) self.assertEqual("Input type hint violation at GroupByKey: " "expected Tuple[TypeVariable[K], TypeVariable[V]], " @@ -1598,9 +1593,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): d = (self.p | beam.Create(['t', 'e', 's', 't']).with_output_types(str) - | 'dup key' >> beam.Map(lambda x: (x, x)) + | 'DupKey' >> beam.Map(lambda x: (x, x)) .with_output_types(typehints.KV[str, str]) - | 'count dups' >> combine.Count.PerKey()) + | 'CountDups' >> combine.Count.PerKey()) self.assertCompatible(typehints.KV[str, int], d.element_type) assert_that(d, equal_to([('e', 1), ('s', 1), ('t', 2)])) @@ -1609,7 +1604,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_count_perelement_pipeline_type_checking_satisfied(self): d = (self.p | beam.Create([1, 1, 2, 3]).with_output_types(int) - | 'count elems' >> combine.Count.PerElement()) + | 'CountElems' >> combine.Count.PerElement()) self.assertCompatible(typehints.KV[int, int], d.element_type) assert_that(d, equal_to([(1, 2), (2, 1), (3, 1)])) @@ -1621,7 +1616,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | 'f' >> beam.Create([1, 1, 2, 3]) - | 'count elems' >> combine.Count.PerElement()) + | 'CountElems' >> combine.Count.PerElement()) self.assertEqual('Pipeline type checking is enabled, however no output ' 'type-hint was found for the PTransform ' @@ -1634,7 +1629,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): d = (self.p | beam.Create([True, True, False, True, True]) .with_output_types(bool) - | 'count elems' >> combine.Count.PerElement()) + | 'CountElems' >> combine.Count.PerElement()) self.assertCompatible(typehints.KV[bool, int], d.element_type) assert_that(d, equal_to([(False, 1), (True, 4)])) @@ -1643,7 +1638,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_top_of_pipeline_checking_satisfied(self): d = (self.p | beam.Create(range(5, 11)).with_output_types(int) - | 'top 3' >> combine.Top.Of(3, lambda x, y: x < y)) + | 'Top 3' >> combine.Top.Of(3, lambda x, y: x < y)) self.assertCompatible(typehints.Iterable[int], d.element_type) @@ -1655,7 +1650,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): d = (self.p | beam.Create(list('testing')).with_output_types(str) - | 'acii top' >> combine.Top.Of(3, lambda x, y: x < y)) + | 'AciiTop' >> combine.Top.Of(3, lambda x, y: x < y)) self.assertCompatible(typehints.Iterable[str], d.element_type) assert_that(d, equal_to([['t', 't', 's']])) @@ -1665,8 +1660,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create(range(100)).with_output_types(int) - | 'num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int) - | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b)) + | 'Num + 1' >> beam.Map(lambda x: x + 1).with_output_types(int) + | 'TopMod' >> combine.Top.PerKey(1, lambda a, b: a < b)) self.assertEqual("Input type hint violation at GroupByKey: " "expected Tuple[TypeVariable[K], TypeVariable[V]], " @@ -1676,9 +1671,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_per_key_pipeline_checking_satisfied(self): d = (self.p | beam.Create(range(100)).with_output_types(int) - | ('group mod 3' >> beam.Map(lambda x: (x % 3, x)) + | ('GroupMod 3' >> beam.Map(lambda x: (x % 3, x)) .with_output_types(typehints.KV[int, int])) - | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b)) + | 'TopMod' >> combine.Top.PerKey(1, lambda a, b: a < b)) self.assertCompatible(typehints.Tuple[int, typehints.Iterable[int]], d.element_type) @@ -1690,9 +1685,9 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): d = (self.p | beam.Create(range(21)) - | ('group mod 3' >> beam.Map(lambda x: (x % 3, x)) + | ('GroupMod 3' >> beam.Map(lambda x: (x % 3, x)) .with_output_types(typehints.KV[int, int])) - | 'top mod' >> combine.Top.PerKey(1, lambda a, b: a < b)) + | 'TopMod' >> combine.Top.PerKey(1, lambda a, b: a < b)) self.assertCompatible(typehints.KV[int, typehints.Iterable[int]], d.element_type) @@ -1702,7 +1697,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_sample_globally_pipeline_satisfied(self): d = (self.p | beam.Create([2, 2, 3, 3]).with_output_types(int) - | 'sample' >> combine.Sample.FixedSizeGlobally(3)) + | 'Sample' >> combine.Sample.FixedSizeGlobally(3)) self.assertCompatible(typehints.Iterable[int], d.element_type) @@ -1718,7 +1713,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): d = (self.p | beam.Create([2, 2, 3, 3]).with_output_types(int) - | 'sample' >> combine.Sample.FixedSizeGlobally(2)) + | 'Sample' >> combine.Sample.FixedSizeGlobally(2)) self.assertCompatible(typehints.Iterable[int], d.element_type) @@ -1733,7 +1728,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): d = (self.p | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)]) .with_output_types(typehints.KV[int, int])) - | 'sample' >> combine.Sample.FixedSizePerKey(2)) + | 'Sample' >> combine.Sample.FixedSizePerKey(2)) self.assertCompatible(typehints.KV[int, typehints.Iterable[int]], d.element_type) @@ -1752,7 +1747,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): d = (self.p | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)]) .with_output_types(typehints.KV[int, int])) - | 'sample' >> combine.Sample.FixedSizePerKey(1)) + | 'Sample' >> combine.Sample.FixedSizePerKey(1)) self.assertCompatible(typehints.KV[int, typehints.Iterable[int]], d.element_type) @@ -1835,13 +1830,13 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(TypeError) as e: (self.p | beam.Create([1, 2, 3]).with_output_types(int) - | 'len' >> beam.Map(lambda x: len(x)).with_output_types(int)) + | 'Len' >> beam.Map(lambda x: len(x)).with_output_types(int)) self.p.run() # Our special type-checking related TypeError shouldn't have been raised. # Instead the above pipeline should have triggered a regular Python runtime # TypeError. - self.assertEqual("object of type 'int' has no len() [while running 'len']", + self.assertEqual("object of type 'int' has no len() [while running 'Len']", e.exception.message) self.assertFalse(isinstance(e, typehints.TypeCheckError)) @@ -1869,7 +1864,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: _ = (self.p | beam.Create(['a', 'b', 'c']) - | 'ungroupable' >> beam.Map(lambda x: (x, 0, 1.0)) + | 'Ungroupable' >> beam.Map(lambda x: (x, 0, 1.0)) | beam.GroupByKey()) self.assertEqual('Input type hint violation at GroupByKey: ' @@ -1879,11 +1874,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_type_inference_command_line_flag_toggle(self): self.p.options.view_as(TypeOptions).pipeline_type_check = False - x = self.p | 'c1' >> beam.Create([1, 2, 3, 4]) + x = self.p | 'C1' >> beam.Create([1, 2, 3, 4]) self.assertIsNone(x.element_type) self.p.options.view_as(TypeOptions).pipeline_type_check = True - x = self.p | 'c2' >> beam.Create([1, 2, 3, 4]) + x = self.p | 'C2' >> beam.Create([1, 2, 3, 4]) self.assertEqual(int, x.element_type)
