Automatically convert examples to use with syntax.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3a62b4f7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3a62b4f7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3a62b4f7 Branch: refs/heads/master Commit: 3a62b4f7b20bda2b3c4ca648f90988d387cfe20d Parents: d0601b3 Author: Robert Bradshaw <[email protected]> Authored: Thu May 18 17:22:25 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Tue May 23 10:19:04 2017 -0700 ---------------------------------------------------------------------- .../examples/complete/autocomplete.py | 19 +- .../examples/complete/autocomplete_test.py | 31 +- .../examples/complete/estimate_pi.py | 11 +- .../examples/complete/estimate_pi_test.py | 12 +- .../examples/complete/game/hourly_team_score.py | 19 +- .../examples/complete/game/user_score.py | 15 +- .../complete/juliaset/juliaset/juliaset.py | 44 +- .../apache_beam/examples/complete/tfidf.py | 21 +- .../apache_beam/examples/complete/tfidf_test.py | 28 +- .../examples/complete/top_wikipedia_sessions.py | 12 +- .../complete/top_wikipedia_sessions_test.py | 9 +- .../examples/cookbook/bigquery_schema.py | 159 +++--- .../examples/cookbook/bigquery_side_input.py | 51 +- .../cookbook/bigquery_side_input_test.py | 39 +- .../examples/cookbook/bigquery_tornadoes.py | 33 +- .../cookbook/bigquery_tornadoes_test.py | 19 +- .../apache_beam/examples/cookbook/coders.py | 16 +- .../examples/cookbook/coders_test.py | 14 +- .../examples/cookbook/custom_ptransform.py | 27 +- .../examples/cookbook/custom_ptransform_test.py | 11 +- .../examples/cookbook/datastore_wordcount.py | 20 +- .../apache_beam/examples/cookbook/filters.py | 21 +- .../examples/cookbook/group_with_coder.py | 43 +- .../examples/cookbook/group_with_coder_test.py | 4 +- .../examples/cookbook/mergecontacts.py | 115 +++-- .../examples/cookbook/mergecontacts_test.py | 3 +- .../examples/cookbook/multiple_output_pardo.py | 72 ++- .../cookbook/multiple_output_pardo_test.py | 2 +- .../apache_beam/examples/snippets/snippets.py | 494 +++++++++---------- .../examples/snippets/snippets_test.py | 326 ++++++------ .../apache_beam/examples/streaming_wordcap.py | 24 +- .../apache_beam/examples/streaming_wordcount.py | 44 +- sdks/python/apache_beam/examples/wordcount.py | 1 - .../apache_beam/examples/wordcount_debugging.py | 55 +-- .../apache_beam/examples/wordcount_minimal.py | 33 +- .../python/apache_beam/io/filebasedsink_test.py | 16 +- sdks/python/apache_beam/pipeline.py | 19 +- .../apache_beam/transforms/combiners_test.py | 58 +-- .../apache_beam/transforms/window_test.py | 147 +++--- .../transforms/write_ptransform_test.py | 7 +- .../typehints/typed_pipeline_test.py | 22 +- 41 files changed, 1005 insertions(+), 1111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/autocomplete.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py index f0acc3f..ab3397c 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete.py @@ -44,16 +44,15 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - (p # pylint: disable=expression-not-assigned - | 'read' >> ReadFromText(known_args.input) - | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) - | 'TopPerPrefix' >> TopPerPrefix(5) - | 'format' >> beam.Map( - lambda (prefix, candidates): '%s: %s' % (prefix, candidates)) - | 'write' >> WriteToText(known_args.output)) - p.run() + with beam.Pipeline(options=pipeline_options) as p: + + (p # pylint: disable=expression-not-assigned + | 'read' >> ReadFromText(known_args.input) + | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) + | 'TopPerPrefix' >> TopPerPrefix(5) + | 'format' >> beam.Map( + lambda (prefix, candidates): '%s: %s' % (prefix, candidates)) + | 'write' >> WriteToText(known_args.output)) class TopPerPrefix(beam.PTransform): http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/autocomplete_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py index 378d222..e2c84d6 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py @@ -31,22 +31,21 @@ class AutocompleteTest(unittest.TestCase): WORDS = ['this', 'this', 'that', 'to', 'to', 'to'] def test_top_prefixes(self): - p = TestPipeline() - words = p | beam.Create(self.WORDS) - result = words | autocomplete.TopPerPrefix(5) - # values must be hashable for now - result = result | beam.Map(lambda (k, vs): (k, tuple(vs))) - assert_that(result, equal_to( - [ - ('t', ((3, 'to'), (2, 'this'), (1, 'that'))), - ('to', ((3, 'to'), )), - ('th', ((2, 'this'), (1, 'that'))), - ('thi', ((2, 'this'), )), - ('this', ((2, 'this'), )), - ('tha', ((1, 'that'), )), - ('that', ((1, 'that'), )), - ])) - p.run() + with TestPipeline() as p: + words = p | beam.Create(self.WORDS) + result = words | autocomplete.TopPerPrefix(5) + # values must be hashable for now + result = result | beam.Map(lambda (k, vs): (k, tuple(vs))) + assert_that(result, equal_to( + [ + ('t', ((3, 'to'), (2, 'this'), (1, 'that'))), + ('to', ((3, 'to'), )), + ('th', ((2, 'this'), (1, 'that'))), + ('thi', ((2, 'this'), )), + ('this', ((2, 'this'), )), + ('tha', ((1, 'that'), )), + ('that', ((1, 'that'), )), + ])) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/estimate_pi.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py index c709713..7e3c4cd 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py @@ -113,14 +113,11 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) + with beam.Pipeline(options=pipeline_options) as p: - (p # pylint: disable=expression-not-assigned - | EstimatePiTransform() - | WriteToText(known_args.output, coder=JsonCoder())) - - # Actually run the pipeline (all operations above are deferred). - p.run() + (p # pylint: disable=expression-not-assigned + | EstimatePiTransform() + | WriteToText(known_args.output, coder=JsonCoder())) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/estimate_pi_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py index fd51309..f1cbb0a 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py @@ -38,13 +38,13 @@ def in_between(lower, upper): class EstimatePiTest(unittest.TestCase): def test_basics(self): - p = TestPipeline() - result = p | 'Estimate' >> estimate_pi.EstimatePiTransform(5000) + with TestPipeline() as p: + result = p | 'Estimate' >> estimate_pi.EstimatePiTransform(5000) - # Note: Probabilistically speaking this test can fail with a probability - # that is very small (VERY) given that we run at least 500 thousand trials. - assert_that(result, in_between(3.125, 3.155)) - p.run() + # Note: Probabilistically speaking this test can fail with a probability + # that is very small (VERY) given that we run at least 500 thousand + # trials. + assert_that(result, in_between(3.125, 3.155)) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py index e9d7188..9f398d9 100644 --- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py +++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py @@ -276,18 +276,15 @@ def run(argv=None): known_args, pipeline_args = parser.parse_known_args(argv) pipeline_options = PipelineOptions(pipeline_args) - p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True - - (p # pylint: disable=expression-not-assigned - | ReadFromText(known_args.input) - | HourlyTeamScore( - known_args.start_min, known_args.stop_min, known_args.window_duration) - | WriteWindowedToBigQuery( - known_args.table_name, known_args.dataset, configure_bigquery_write())) - - result = p.run() - result.wait_until_finish() + with beam.Pipeline(options=pipeline_options) as p: + + (p # pylint: disable=expression-not-assigned + | ReadFromText(known_args.input) + | HourlyTeamScore( + known_args.start_min, known_args.stop_min, known_args.window_duration) + | WriteWindowedToBigQuery( + known_args.table_name, known_args.dataset, configure_bigquery_write())) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/game/user_score.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/user_score.py b/sdks/python/apache_beam/examples/complete/game/user_score.py index 389d2c6..c9f2738 100644 --- a/sdks/python/apache_beam/examples/complete/game/user_score.py +++ b/sdks/python/apache_beam/examples/complete/game/user_score.py @@ -201,16 +201,13 @@ def run(argv=None): known_args, pipeline_args = parser.parse_known_args(argv) pipeline_options = PipelineOptions(pipeline_args) - p = beam.Pipeline(options=pipeline_options) + with beam.Pipeline(options=pipeline_options) as p: - (p # pylint: disable=expression-not-assigned - | ReadFromText(known_args.input) # Read events from a file and parse them. - | UserScore() - | WriteToBigQuery( - known_args.table_name, known_args.dataset, configure_bigquery_write())) - - result = p.run() - result.wait_until_finish() + (p # pylint: disable=expression-not-assigned + | ReadFromText(known_args.input) # Read events from a file and parse them. + | UserScore() + | WriteToBigQuery( + known_args.table_name, known_args.dataset, configure_bigquery_write())) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py index 5ff2b78..61e3fd1 100644 --- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py +++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py @@ -99,26 +99,24 @@ def run(argv=None): # pylint: disable=missing-docstring help='Output file to write the resulting image to.') known_args, pipeline_args = parser.parse_known_args(argv) - p = beam.Pipeline(argv=pipeline_args) - n = int(known_args.grid_size) - - coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100) - - # Group each coordinate triplet by its x value, then write the coordinates to - # the output file with an x-coordinate grouping per line. - # pylint: disable=expression-not-assigned - (coordinates - | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i))) - | 'x coord' >> beam.GroupByKey() - | 'format' >> beam.Map( - lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords)) - | WriteToText(known_args.coordinate_output)) - # pylint: enable=expression-not-assigned - return p.run().wait_until_finish() - - # Optionally render the image and save it to a file. - # TODO(silviuc): Add this functionality. - # if p.options.image_output is not None: - # julia_set_image = generate_julia_set_visualization( - # file_with_coordinates, n, 100) - # save_julia_set_visualization(p.options.image_output, julia_set_image) + with beam.Pipeline(argv=pipeline_args) as p: + n = int(known_args.grid_size) + + coordinates = generate_julia_set_colors(p, complex(-.62772, .42193), n, 100) + + # Group each coordinate triplet by its x value, then write the coordinates + # to the output file with an x-coordinate grouping per line. + # pylint: disable=expression-not-assigned + (coordinates + | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i))) + | 'x coord' >> beam.GroupByKey() + | 'format' >> beam.Map( + lambda (k, coords): ' '.join('(%s, %s, %s)' % c for c in coords)) + | WriteToText(known_args.coordinate_output)) + + # Optionally render the image and save it to a file. + # TODO(silviuc): Add this functionality. + # if p.options.image_output is not None: + # julia_set_image = generate_julia_set_visualization( + # file_with_coordinates, n, 100) + # save_julia_set_visualization(p.options.image_output, julia_set_image) http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/tfidf.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py index a98d906..a88ff82 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf.py +++ b/sdks/python/apache_beam/examples/complete/tfidf.py @@ -191,17 +191,16 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - # Read documents specified by the uris command line option. - pcoll = read_documents(p, glob.glob(known_args.uris)) - # Compute TF-IDF information for each word. - output = pcoll | TfIdf() - # Write the output using a "Write" transform that has side effects. - # pylint: disable=expression-not-assigned - output | 'write' >> WriteToText(known_args.output) - # Execute the pipeline and wait until it is completed. - p.run().wait_until_finish() + with beam.Pipeline(options=pipeline_options) as p: + + # Read documents specified by the uris command line option. + pcoll = read_documents(p, glob.glob(known_args.uris)) + # Compute TF-IDF information for each word. + output = pcoll | TfIdf() + # Write the output using a "Write" transform that has side effects. + # pylint: disable=expression-not-assigned + output | 'write' >> WriteToText(known_args.output) + # Execute the pipeline and wait until it is completed. if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/tfidf_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py index f177dfc..322426f 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf_test.py +++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py @@ -50,20 +50,20 @@ class TfIdfTest(unittest.TestCase): f.write(contents) def test_tfidf_transform(self): - p = TestPipeline() - uri_to_line = p | 'create sample' >> beam.Create( - [('1.txt', 'abc def ghi'), - ('2.txt', 'abc def'), - ('3.txt', 'abc')]) - result = ( - uri_to_line - | tfidf.TfIdf() - | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf))) - assert_that(result, equal_to(EXPECTED_RESULTS)) - # Run the pipeline. Note that the assert_that above adds to the pipeline - # a check that the result PCollection contains expected values. To actually - # trigger the check the pipeline must be run. - p.run() + with TestPipeline() as p: + uri_to_line = p | 'create sample' >> beam.Create( + [('1.txt', 'abc def ghi'), + ('2.txt', 'abc def'), + ('3.txt', 'abc')]) + result = ( + uri_to_line + | tfidf.TfIdf() + | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf))) + assert_that(result, equal_to(EXPECTED_RESULTS)) + # Run the pipeline. Note that the assert_that above adds to the pipeline + # a check that the result PCollection contains expected values. + # To actually trigger the check the pipeline must be run (e.g. by + # exiting the with context). def test_basics(self): # Setup the files with expected content. http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py index aa48e4e..9a9ad78 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py @@ -159,14 +159,12 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) + with beam.Pipeline(options=pipeline_options) as p: - (p # pylint: disable=expression-not-assigned - | ReadFromText(known_args.input) - | ComputeTopSessions(known_args.sampling_threshold) - | WriteToText(known_args.output)) - - p.run() + (p # pylint: disable=expression-not-assigned + | ReadFromText(known_args.input) + | ComputeTopSessions(known_args.sampling_threshold) + | WriteToText(known_args.output)) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py index 5fb6276..ced8a44 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py @@ -52,12 +52,11 @@ class ComputeTopSessionsTest(unittest.TestCase): ] def test_compute_top_sessions(self): - p = TestPipeline() - edits = p | beam.Create(self.EDITS) - result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0) + with TestPipeline() as p: + edits = p | beam.Create(self.EDITS) + result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0) - assert_that(result, equal_to(self.EXPECTED)) - p.run() + assert_that(result, equal_to(self.EXPECTED)) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py index 400189e..3a8af67 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py @@ -42,86 +42,85 @@ def run(argv=None): 'or DATASET.TABLE.')) known_args, pipeline_args = parser.parse_known_args(argv) - p = beam.Pipeline(argv=pipeline_args) - - from apache_beam.io.gcp.internal.clients import bigquery # pylint: disable=wrong-import-order, wrong-import-position - - table_schema = bigquery.TableSchema() - - # Fields that use standard types. - kind_schema = bigquery.TableFieldSchema() - kind_schema.name = 'kind' - kind_schema.type = 'string' - kind_schema.mode = 'nullable' - table_schema.fields.append(kind_schema) - - full_name_schema = bigquery.TableFieldSchema() - full_name_schema.name = 'fullName' - full_name_schema.type = 'string' - full_name_schema.mode = 'required' - table_schema.fields.append(full_name_schema) - - age_schema = bigquery.TableFieldSchema() - age_schema.name = 'age' - age_schema.type = 'integer' - age_schema.mode = 'nullable' - table_schema.fields.append(age_schema) - - gender_schema = bigquery.TableFieldSchema() - gender_schema.name = 'gender' - gender_schema.type = 'string' - gender_schema.mode = 'nullable' - table_schema.fields.append(gender_schema) - - # A nested field - phone_number_schema = bigquery.TableFieldSchema() - phone_number_schema.name = 'phoneNumber' - phone_number_schema.type = 'record' - phone_number_schema.mode = 'nullable' - - area_code = bigquery.TableFieldSchema() - area_code.name = 'areaCode' - area_code.type = 'integer' - area_code.mode = 'nullable' - phone_number_schema.fields.append(area_code) - - number = bigquery.TableFieldSchema() - number.name = 'number' - number.type = 'integer' - number.mode = 'nullable' - phone_number_schema.fields.append(number) - table_schema.fields.append(phone_number_schema) - - # A repeated field. - children_schema = bigquery.TableFieldSchema() - children_schema.name = 'children' - children_schema.type = 'string' - children_schema.mode = 'repeated' - table_schema.fields.append(children_schema) - - def create_random_record(record_id): - return {'kind': 'kind' + record_id, 'fullName': 'fullName'+record_id, - 'age': int(record_id) * 10, 'gender': 'male', - 'phoneNumber': { - 'areaCode': int(record_id) * 100, - 'number': int(record_id) * 100000}, - 'children': ['child' + record_id + '1', - 'child' + record_id + '2', - 'child' + record_id + '3'] - } - - # pylint: disable=expression-not-assigned - record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5']) - records = record_ids | 'CreateRecords' >> beam.Map(create_random_record) - records | 'write' >> beam.io.Write( - beam.io.BigQuerySink( - known_args.output, - schema=table_schema, - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) - - # Run the pipeline (all operations are deferred until run() is called). - p.run() + with beam.Pipeline(argv=pipeline_args) as p: + + from apache_beam.io.gcp.internal.clients import bigquery # pylint: disable=wrong-import-order, wrong-import-position + + table_schema = bigquery.TableSchema() + + # Fields that use standard types. + kind_schema = bigquery.TableFieldSchema() + kind_schema.name = 'kind' + kind_schema.type = 'string' + kind_schema.mode = 'nullable' + table_schema.fields.append(kind_schema) + + full_name_schema = bigquery.TableFieldSchema() + full_name_schema.name = 'fullName' + full_name_schema.type = 'string' + full_name_schema.mode = 'required' + table_schema.fields.append(full_name_schema) + + age_schema = bigquery.TableFieldSchema() + age_schema.name = 'age' + age_schema.type = 'integer' + age_schema.mode = 'nullable' + table_schema.fields.append(age_schema) + + gender_schema = bigquery.TableFieldSchema() + gender_schema.name = 'gender' + gender_schema.type = 'string' + gender_schema.mode = 'nullable' + table_schema.fields.append(gender_schema) + + # A nested field + phone_number_schema = bigquery.TableFieldSchema() + phone_number_schema.name = 'phoneNumber' + phone_number_schema.type = 'record' + phone_number_schema.mode = 'nullable' + + area_code = bigquery.TableFieldSchema() + area_code.name = 'areaCode' + area_code.type = 'integer' + area_code.mode = 'nullable' + phone_number_schema.fields.append(area_code) + + number = bigquery.TableFieldSchema() + number.name = 'number' + number.type = 'integer' + number.mode = 'nullable' + phone_number_schema.fields.append(number) + table_schema.fields.append(phone_number_schema) + + # A repeated field. + children_schema = bigquery.TableFieldSchema() + children_schema.name = 'children' + children_schema.type = 'string' + children_schema.mode = 'repeated' + table_schema.fields.append(children_schema) + + def create_random_record(record_id): + return {'kind': 'kind' + record_id, 'fullName': 'fullName'+record_id, + 'age': int(record_id) * 10, 'gender': 'male', + 'phoneNumber': { + 'areaCode': int(record_id) * 100, + 'number': int(record_id) * 100000}, + 'children': ['child' + record_id + '1', + 'child' + record_id + '2', + 'child' + record_id + '3'] + } + + # pylint: disable=expression-not-assigned + record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5']) + records = record_ids | 'CreateRecords' >> beam.Map(create_random_record) + records | 'write' >> beam.io.Write( + beam.io.BigQuerySink( + known_args.output, + schema=table_schema, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) + + # Run the pipeline (all operations are deferred until run() is called). if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py index 6b28818..9911a67 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py @@ -88,32 +88,31 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - group_ids = [] - for i in xrange(0, int(known_args.num_groups)): - group_ids.append('id' + str(i)) - - query_corpus = 'select UNIQUE(corpus) from publicdata:samples.shakespeare' - query_word = 'select UNIQUE(word) from publicdata:samples.shakespeare' - ignore_corpus = known_args.ignore_corpus - ignore_word = known_args.ignore_word - - pcoll_corpus = p | 'read corpus' >> beam.io.Read( - beam.io.BigQuerySource(query=query_corpus)) - pcoll_word = p | 'read_words' >> beam.io.Read( - beam.io.BigQuerySource(query=query_word)) - pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create( - [ignore_corpus]) - pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word]) - pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids) - - pcoll_groups = create_groups(pcoll_group_ids, pcoll_corpus, pcoll_word, - pcoll_ignore_corpus, pcoll_ignore_word) - - # pylint:disable=expression-not-assigned - pcoll_groups | WriteToText(known_args.output) - p.run() + with beam.Pipeline(options=pipeline_options) as p: + + group_ids = [] + for i in xrange(0, int(known_args.num_groups)): + group_ids.append('id' + str(i)) + + query_corpus = 'select UNIQUE(corpus) from publicdata:samples.shakespeare' + query_word = 'select UNIQUE(word) from publicdata:samples.shakespeare' + ignore_corpus = known_args.ignore_corpus + ignore_word = known_args.ignore_word + + pcoll_corpus = p | 'read corpus' >> beam.io.Read( + beam.io.BigQuerySource(query=query_corpus)) + pcoll_word = p | 'read_words' >> beam.io.Read( + beam.io.BigQuerySource(query=query_word)) + pcoll_ignore_corpus = p | 'create_ignore_corpus' >> beam.Create( + [ignore_corpus]) + pcoll_ignore_word = p | 'create_ignore_word' >> beam.Create([ignore_word]) + pcoll_group_ids = p | 'create groups' >> beam.Create(group_ids) + + pcoll_groups = create_groups(pcoll_group_ids, pcoll_corpus, pcoll_word, + pcoll_ignore_corpus, pcoll_ignore_word) + + # pylint:disable=expression-not-assigned + pcoll_groups | WriteToText(known_args.output) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py index b11dc47..964b35b 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py @@ -30,25 +30,26 @@ from apache_beam.testing.util import equal_to class BigQuerySideInputTest(unittest.TestCase): def test_create_groups(self): - p = TestPipeline() - - group_ids_pcoll = p | 'CreateGroupIds' >> beam.Create(['A', 'B', 'C']) - corpus_pcoll = p | 'CreateCorpus' >> beam.Create( - [{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}]) - words_pcoll = p | 'CreateWords' >> beam.Create( - [{'f': 'word1'}, {'f': 'word2'}, {'f': 'word3'}]) - ignore_corpus_pcoll = p | 'CreateIgnoreCorpus' >> beam.Create(['corpus1']) - ignore_word_pcoll = p | 'CreateIgnoreWord' >> beam.Create(['word1']) - - groups = bigquery_side_input.create_groups(group_ids_pcoll, corpus_pcoll, - words_pcoll, ignore_corpus_pcoll, - ignore_word_pcoll) - - assert_that(groups, equal_to( - [('A', 'corpus2', 'word2'), - ('B', 'corpus2', 'word2'), - ('C', 'corpus2', 'word2')])) - p.run() + with TestPipeline() as p: + + group_ids_pcoll = p | 'CreateGroupIds' >> beam.Create(['A', 'B', 'C']) + corpus_pcoll = p | 'CreateCorpus' >> beam.Create( + [{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}]) + words_pcoll = p | 'CreateWords' >> beam.Create( + [{'f': 'word1'}, {'f': 'word2'}, {'f': 'word3'}]) + ignore_corpus_pcoll = p | 'CreateIgnoreCorpus' >> beam.Create(['corpus1']) + ignore_word_pcoll = p | 'CreateIgnoreWord' >> beam.Create(['word1']) + + groups = bigquery_side_input.create_groups(group_ids_pcoll, + corpus_pcoll, + words_pcoll, + ignore_corpus_pcoll, + ignore_word_pcoll) + + assert_that(groups, equal_to( + [('A', 'corpus2', 'word2'), + ('B', 'corpus2', 'word2'), + ('C', 'corpus2', 'word2')])) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py index ed0c79a..d3b216e 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py @@ -75,23 +75,22 @@ def run(argv=None): 'or DATASET.TABLE.')) known_args, pipeline_args = parser.parse_known_args(argv) - p = beam.Pipeline(argv=pipeline_args) - - # Read the table rows into a PCollection. - rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input)) - counts = count_tornadoes(rows) - - # Write the output using a "Write" transform that has side effects. - # pylint: disable=expression-not-assigned - counts | 'write' >> beam.io.Write( - beam.io.BigQuerySink( - known_args.output, - schema='month:INTEGER, tornado_count:INTEGER', - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) - - # Run the pipeline (all operations are deferred until run() is called). - p.run().wait_until_finish() + with beam.Pipeline(argv=pipeline_args) as p: + + # Read the table rows into a PCollection. + rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input)) + counts = count_tornadoes(rows) + + # Write the output using a "Write" transform that has side effects. + # pylint: disable=expression-not-assigned + counts | 'write' >> beam.io.Write( + beam.io.BigQuerySink( + known_args.output, + schema='month:INTEGER, tornado_count:INTEGER', + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) + + # Run the pipeline (all operations are deferred until run() is called). if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py index c926df8..45dcaba 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py @@ -30,16 +30,15 @@ from apache_beam.testing.util import equal_to class BigQueryTornadoesTest(unittest.TestCase): def test_basics(self): - p = TestPipeline() - rows = (p | 'create' >> beam.Create([ - {'month': 1, 'day': 1, 'tornado': False}, - {'month': 1, 'day': 2, 'tornado': True}, - {'month': 1, 'day': 3, 'tornado': True}, - {'month': 2, 'day': 1, 'tornado': True}])) - results = bigquery_tornadoes.count_tornadoes(rows) - assert_that(results, equal_to([{'month': 1, 'tornado_count': 2}, - {'month': 2, 'tornado_count': 1}])) - p.run().wait_until_finish() + with TestPipeline() as p: + rows = (p | 'create' >> beam.Create([ + {'month': 1, 'day': 1, 'tornado': False}, + {'month': 1, 'day': 2, 'tornado': True}, + {'month': 1, 'day': 3, 'tornado': True}, + {'month': 2, 'day': 1, 'tornado': True}])) + results = bigquery_tornadoes.count_tornadoes(rows) + assert_that(results, equal_to([{'month': 1, 'tornado_count': 2}, + {'month': 2, 'tornado_count': 1}])) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/coders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py index aeeb3c9..f97b0f2 100644 --- a/sdks/python/apache_beam/examples/cookbook/coders.py +++ b/sdks/python/apache_beam/examples/cookbook/coders.py @@ -85,15 +85,13 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - p = beam.Pipeline(argv=pipeline_args) - (p # pylint: disable=expression-not-assigned - | 'read' >> ReadFromText(known_args.input, coder=JsonCoder()) - | 'points' >> beam.FlatMap(compute_points) - | beam.CombinePerKey(sum) - | 'write' >> WriteToText(known_args.output, coder=JsonCoder())) - p.run() + + with beam.Pipeline(options=pipeline_options) as p: + (p # pylint: disable=expression-not-assigned + | 'read' >> ReadFromText(known_args.input, coder=JsonCoder()) + | 'points' >> beam.FlatMap(compute_points) + | beam.CombinePerKey(sum) + | 'write' >> WriteToText(known_args.output, coder=JsonCoder())) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/coders_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py index f71dad8..988d3c9 100644 --- a/sdks/python/apache_beam/examples/cookbook/coders_test.py +++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py @@ -35,13 +35,13 @@ class CodersTest(unittest.TestCase): {'host': ['Brasil', 1], 'guest': ['Italy', 0]}] def test_compute_points(self): - p = TestPipeline() - records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS) - result = (records - | 'points' >> beam.FlatMap(coders.compute_points) - | beam.CombinePerKey(sum)) - assert_that(result, equal_to([('Italy', 0), ('Brasil', 6), ('Germany', 3)])) - p.run() + with TestPipeline() as p: + records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS) + result = (records + | 'points' >> beam.FlatMap(coders.compute_points) + | beam.CombinePerKey(sum)) + assert_that(result, + equal_to([('Italy', 0), ('Brasil', 6), ('Germany', 3)])) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py index 609f2cd..aee69d2 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py @@ -47,11 +47,10 @@ class Count1(beam.PTransform): def run_count1(known_args, options): """Runs the first example pipeline.""" logging.info('Running first pipeline') - p = beam.Pipeline(options=options) - (p | beam.io.ReadFromText(known_args.input) - | Count1() - | beam.io.WriteToText(known_args.output)) - p.run().wait_until_finish() + with beam.Pipeline(options=options) as p: + (p | beam.io.ReadFromText(known_args.input) + | Count1() + | beam.io.WriteToText(known_args.output)) @beam.ptransform_fn @@ -66,11 +65,10 @@ def Count2(pcoll): # pylint: disable=invalid-name def run_count2(known_args, options): """Runs the second example pipeline.""" logging.info('Running second pipeline') - p = beam.Pipeline(options=options) - (p | ReadFromText(known_args.input) - | Count2() # pylint: disable=no-value-for-parameter - | WriteToText(known_args.output)) - p.run().wait_until_finish() + with beam.Pipeline(options=options) as p: + (p | ReadFromText(known_args.input) + | Count2() # pylint: disable=no-value-for-parameter + | WriteToText(known_args.output)) @beam.ptransform_fn @@ -93,11 +91,10 @@ def Count3(pcoll, factor=1): # pylint: disable=invalid-name def run_count3(known_args, options): """Runs the third example pipeline.""" logging.info('Running third pipeline') - p = beam.Pipeline(options=options) - (p | ReadFromText(known_args.input) - | Count3(2) # pylint: disable=no-value-for-parameter - | WriteToText(known_args.output)) - p.run() + with beam.Pipeline(options=options) as p: + (p | ReadFromText(known_args.input) + | Count3(2) # pylint: disable=no-value-for-parameter + | WriteToText(known_args.output)) def get_args(argv): http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py index c7c6dba..7aaccb4 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py @@ -40,12 +40,11 @@ class CustomCountTest(unittest.TestCase): self.run_pipeline(custom_ptransform.Count3(factor), factor=factor) def run_pipeline(self, count_implementation, factor=1): - p = TestPipeline() - words = p | beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG']) - result = words | count_implementation - assert_that( - result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))])) - p.run() + with TestPipeline() as p: + words = p | beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG']) + result = words | count_implementation + assert_that( + result, equal_to([('CAT', (3 * factor)), ('DOG', (2 * factor))])) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py index 411feb8..7161cff 100644 --- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -135,18 +135,15 @@ class EntityWrapper(object): def write_to_datastore(project, user_options, pipeline_options): """Creates a pipeline that writes entities to Cloud Datastore.""" - p = beam.Pipeline(options=pipeline_options) - - # pylint: disable=expression-not-assigned - (p - | 'read' >> ReadFromText(user_options.input) - | 'create entity' >> beam.Map( - EntityWrapper(user_options.namespace, user_options.kind, - user_options.ancestor).make_entity) - | 'write to datastore' >> WriteToDatastore(project)) + with beam.Pipeline(options=pipeline_options) as p: - # Actually run the pipeline (all operations above are deferred). - p.run().wait_until_finish() + # pylint: disable=expression-not-assigned + (p + | 'read' >> ReadFromText(user_options.input) + | 'create entity' >> beam.Map( + EntityWrapper(user_options.namespace, user_options.kind, + user_options.ancestor).make_entity) + | 'write to datastore' >> WriteToDatastore(project)) def make_ancestor_query(kind, namespace, ancestor): @@ -196,7 +193,6 @@ def read_from_datastore(project, user_options, pipeline_options): output | 'write' >> beam.io.WriteToText(file_path_prefix=user_options.output, num_shards=user_options.num_shards) - # Actually run the pipeline (all operations above are deferred). result = p.run() # Wait until completion, main thread would access post-completion job results. result.wait_until_finish() http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/filters.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/filters.py b/sdks/python/apache_beam/examples/cookbook/filters.py index 374001c..1fbf763 100644 --- a/sdks/python/apache_beam/examples/cookbook/filters.py +++ b/sdks/python/apache_beam/examples/cookbook/filters.py @@ -86,20 +86,17 @@ def run(argv=None): help='Numeric value of month to filter on.') known_args, pipeline_args = parser.parse_known_args(argv) - p = beam.Pipeline(argv=pipeline_args) + with beam.Pipeline(argv=pipeline_args) as p: - input_data = p | beam.io.Read(beam.io.BigQuerySource(known_args.input)) + input_data = p | beam.io.Read(beam.io.BigQuerySource(known_args.input)) - # pylint: disable=expression-not-assigned - (filter_cold_days(input_data, known_args.month_filter) - | 'SaveToBQ' >> beam.io.Write(beam.io.BigQuerySink( - known_args.output, - schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT', - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))) - - # Actually run the pipeline (all operations above are deferred). - p.run() + # pylint: disable=expression-not-assigned + (filter_cold_days(input_data, known_args.month_filter) + | 'SaveToBQ' >> beam.io.Write(beam.io.BigQuerySink( + known_args.output, + schema='year:INTEGER,month:INTEGER,day:INTEGER,mean_temp:FLOAT', + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/group_with_coder.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py index 6bdadae..9c0d04b 100644 --- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py +++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py @@ -95,28 +95,27 @@ def run(args=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - # Register the custom coder for the Player class, so that it will be used in - # the computation. - coders.registry.register_coder(Player, PlayerCoder) - - (p # pylint: disable=expression-not-assigned - | ReadFromText(known_args.input) - # The get_players function is annotated with a type hint above, so the type - # system knows the output type of the following operation is a key-value pair - # of a Player and an int. Please see the documentation for details on - # types that are inferred automatically as well as other ways to specify - # type hints. - | beam.Map(get_players) - # The output type hint of the previous step is used to infer that the key - # type of the following operation is the Player type. Since a custom coder - # is registered for the Player class above, a PlayerCoder will be used to - # encode Player objects as keys for this combine operation. - | beam.CombinePerKey(sum) - | beam.Map(lambda (k, v): '%s,%d' % (k.name, v)) - | WriteToText(known_args.output)) - return p.run() + with beam.Pipeline(options=pipeline_options) as p: + + # Register the custom coder for the Player class, so that it will be used in + # the computation. + coders.registry.register_coder(Player, PlayerCoder) + + (p # pylint: disable=expression-not-assigned + | ReadFromText(known_args.input) + # The get_players function is annotated with a type hint above, so the type + # system knows the output type of the following operation is a key-value + # pair of a Player and an int. Please see the documentation for details on + # types that are inferred automatically as well as other ways to specify + # type hints. + | beam.Map(get_players) + # The output type hint of the previous step is used to infer that the key + # type of the following operation is the Player type. Since a custom coder + # is registered for the Player class above, a PlayerCoder will be used to + # encode Player objects as keys for this combine operation. + | beam.CombinePerKey(sum) + | beam.Map(lambda (k, v): '%s,%d' % (k.name, v)) + | WriteToText(known_args.output)) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py index 4e87966..268ba8d 100644 --- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py +++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py @@ -50,7 +50,7 @@ class GroupWithCoderTest(unittest.TestCase): temp_path = self.create_temp_file(self.SAMPLE_RECORDS) group_with_coder.run([ '--input=%s*' % temp_path, - '--output=%s.result' % temp_path]).wait_until_finish() + '--output=%s.result' % temp_path]) # Parse result file and compare. results = [] with open(temp_path + '.result-00000-of-00001') as result_file: @@ -71,7 +71,7 @@ class GroupWithCoderTest(unittest.TestCase): group_with_coder.run([ '--no_pipeline_type_check', '--input=%s*' % temp_path, - '--output=%s.result' % temp_path]).wait_until_finish() + '--output=%s.result' % temp_path]) # Parse result file and compare. results = [] with open(temp_path + '.result-00000-of-00001') as result_file: http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/mergecontacts.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py index 4f53c61..9acdd90 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py @@ -70,64 +70,63 @@ def run(argv=None, assert_results=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - # Helper: read a tab-separated key-value mapping from a text file, escape all - # quotes/backslashes, and convert it a PCollection of (key, value) pairs. - def read_kv_textfile(label, textfile): - return (p - | 'Read: %s' % label >> ReadFromText(textfile) - | 'Backslash: %s' % label >> beam.Map( - lambda x: re.sub(r'\\', r'\\\\', x)) - | 'EscapeQuotes: %s' % label >> beam.Map( - lambda x: re.sub(r'"', r'\"', x)) - | 'Split: %s' % label >> beam.Map( - lambda x: re.split(r'\t+', x, 1))) - - # Read input databases. - email = read_kv_textfile('email', known_args.input_email) - phone = read_kv_textfile('phone', known_args.input_phone) - snailmail = read_kv_textfile('snailmail', known_args.input_snailmail) - - # Group together all entries under the same name. - grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey() - - # Prepare tab-delimited output; something like this: - # "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only" - tsv_lines = grouped | beam.Map( - lambda (name, (email, phone, snailmail)): '\t'.join( - ['"%s"' % name, - '"%s"' % ','.join(email), - '"%s"' % ','.join(phone), - '"%s"' % next(iter(snailmail), '')])) - - # Compute some stats about our database of people. - luddites = grouped | beam.Filter( # People without email. - lambda (name, (email, phone, snailmail)): not next(iter(email), None)) - writers = grouped | beam.Filter( # People without phones. - lambda (name, (email, phone, snailmail)): not next(iter(phone), None)) - nomads = grouped | beam.Filter( # People without addresses. - lambda (name, (email, phone, snailmail)): not next(iter(snailmail), None)) - - num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally() - num_writers = writers | 'Writers' >> beam.combiners.Count.Globally() - num_nomads = nomads | 'Nomads' >> beam.combiners.Count.Globally() - - # Write tab-delimited output. - # pylint: disable=expression-not-assigned - tsv_lines | 'WriteTsv' >> WriteToText(known_args.output_tsv) - - # TODO(silviuc): Move the assert_results logic to the unit test. - if assert_results is not None: - expected_luddites, expected_writers, expected_nomads = assert_results - assert_that(num_luddites, equal_to([expected_luddites]), - label='assert:luddites') - assert_that(num_writers, equal_to([expected_writers]), - label='assert:writers') - assert_that(num_nomads, equal_to([expected_nomads]), - label='assert:nomads') - # Execute pipeline. - return p.run() + with beam.Pipeline(options=pipeline_options) as p: + + # Helper: read a tab-separated key-value mapping from a text file, + # escape all quotes/backslashes, and convert it a PCollection of + # (key, value) pairs. + def read_kv_textfile(label, textfile): + return (p + | 'Read: %s' % label >> ReadFromText(textfile) + | 'Backslash: %s' % label >> beam.Map( + lambda x: re.sub(r'\\', r'\\\\', x)) + | 'EscapeQuotes: %s' % label >> beam.Map( + lambda x: re.sub(r'"', r'\"', x)) + | 'Split: %s' % label >> beam.Map( + lambda x: re.split(r'\t+', x, 1))) + + # Read input databases. + email = read_kv_textfile('email', known_args.input_email) + phone = read_kv_textfile('phone', known_args.input_phone) + snailmail = read_kv_textfile('snailmail', known_args.input_snailmail) + + # Group together all entries under the same name. + grouped = (email, phone, snailmail) | 'group_by_name' >> beam.CoGroupByKey() + + # Prepare tab-delimited output; something like this: + # "name"<TAB>"email_1,email_2"<TAB>"phone"<TAB>"first_snailmail_only" + tsv_lines = grouped | beam.Map( + lambda (name, (email, phone, snailmail)): '\t'.join( + ['"%s"' % name, + '"%s"' % ','.join(email), + '"%s"' % ','.join(phone), + '"%s"' % next(iter(snailmail), '')])) + + # Compute some stats about our database of people. + luddites = grouped | beam.Filter( # People without email. + lambda (name, (email, phone, snailmail)): not next(iter(email), None)) + writers = grouped | beam.Filter( # People without phones. + lambda (name, (email, phone, snailmail)): not next(iter(phone), None)) + nomads = grouped | beam.Filter( # People without addresses. + lambda (name, (e, p, snailmail)): not next(iter(snailmail), None)) + + num_luddites = luddites | 'Luddites' >> beam.combiners.Count.Globally() + num_writers = writers | 'Writers' >> beam.combiners.Count.Globally() + num_nomads = nomads | 'Nomads' >> beam.combiners.Count.Globally() + + # Write tab-delimited output. + # pylint: disable=expression-not-assigned + tsv_lines | 'WriteTsv' >> WriteToText(known_args.output_tsv) + + # TODO(silviuc): Move the assert_results logic to the unit test. + if assert_results is not None: + expected_luddites, expected_writers, expected_nomads = assert_results + assert_that(num_luddites, equal_to([expected_luddites]), + label='assert:luddites') + assert_that(num_writers, equal_to([expected_writers]), + label='assert:writers') + assert_that(num_nomads, equal_to([expected_nomads]), + label='assert:nomads') if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py index 09f71d3..b3be0dd 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py @@ -107,13 +107,12 @@ class MergeContactsTest(unittest.TestCase): result_prefix = self.create_temp_file('') - result = mergecontacts.run([ + mergecontacts.run([ '--input_email=%s' % path_email, '--input_phone=%s' % path_phone, '--input_snailmail=%s' % path_snailmail, '--output_tsv=%s.tsv' % result_prefix, '--output_stats=%s.stats' % result_prefix], assert_results=(2, 1, 3)) - result.wait_until_finish() with open('%s.tsv-00000-of-00001' % result_prefix) as f: contents = f.read() http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py index 9759f48..2316c66 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py @@ -141,43 +141,41 @@ def run(argv=None): # workflow rely on global context (e.g., a module imported at module level). pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True - p = beam.Pipeline(options=pipeline_options) - - lines = p | ReadFromText(known_args.input) - - # with_outputs allows accessing the explicitly tagged outputs of a DoFn. - split_lines_result = (lines - | beam.ParDo(SplitLinesToWordsFn()).with_outputs( - SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS, - SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT, - main='words')) - - # split_lines_result is an object of type DoOutputsTuple. It supports - # accessing result in alternative ways. - words, _, _ = split_lines_result - short_words = split_lines_result[ - SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS] - character_count = split_lines_result.tag_character_count - - # pylint: disable=expression-not-assigned - (character_count - | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) - | beam.GroupByKey() - | 'count chars' >> beam.Map(lambda (_, counts): sum(counts)) - | 'write chars' >> WriteToText(known_args.output + '-chars')) - - # pylint: disable=expression-not-assigned - (short_words - | 'count short words' >> CountWords() - | 'write short words' >> WriteToText( - known_args.output + '-short-words')) - - # pylint: disable=expression-not-assigned - (words - | 'count words' >> CountWords() - | 'write words' >> WriteToText(known_args.output + '-words')) - - return p.run() + with beam.Pipeline(options=pipeline_options) as p: + + lines = p | ReadFromText(known_args.input) + + # with_outputs allows accessing the explicitly tagged outputs of a DoFn. + split_lines_result = (lines + | beam.ParDo(SplitLinesToWordsFn()).with_outputs( + SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS, + SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT, + main='words')) + + # split_lines_result is an object of type DoOutputsTuple. It supports + # accessing result in alternative ways. + words, _, _ = split_lines_result + short_words = split_lines_result[ + SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS] + character_count = split_lines_result.tag_character_count + + # pylint: disable=expression-not-assigned + (character_count + | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x)) + | beam.GroupByKey() + | 'count chars' >> beam.Map(lambda (_, counts): sum(counts)) + | 'write chars' >> WriteToText(known_args.output + '-chars')) + + # pylint: disable=expression-not-assigned + (short_words + | 'count short words' >> CountWords() + | 'write short words' >> WriteToText( + known_args.output + '-short-words')) + + # pylint: disable=expression-not-assigned + (words + | 'count words' >> CountWords() + | 'write words' >> WriteToText(known_args.output + '-words')) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 2c9111c..3ddd668 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -52,7 +52,7 @@ class MultipleOutputParDo(unittest.TestCase): multiple_output_pardo.run([ '--input=%s*' % temp_path, - '--output=%s' % result_prefix]).wait_until_finish() + '--output=%s' % result_prefix]) expected_char_count = len(''.join(self.SAMPLE_TEXT.split('\n'))) with open(result_prefix + '-chars-00000-of-00001') as f: http://git-wip-us.apache.org/repos/asf/beam/blob/3a62b4f7/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 7259572..70929e9 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -147,18 +147,15 @@ def model_pipelines(argv): pipeline_options = PipelineOptions(argv) my_options = pipeline_options.view_as(MyOptions) - p = beam.Pipeline(options=pipeline_options) - - (p - | beam.io.ReadFromText(my_options.input) - | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) - | beam.Map(lambda x: (x, 1)) - | beam.combiners.Count.PerKey() - | beam.io.WriteToText(my_options.output)) - - result = p.run() + with beam.Pipeline(options=pipeline_options) as p: + + (p + | beam.io.ReadFromText(my_options.input) + | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) + | beam.Map(lambda x: (x, 1)) + | beam.combiners.Count.PerKey() + | beam.io.WriteToText(my_options.output)) # [END model_pipelines] - result.wait_until_finish() def model_pcollection(argv): @@ -178,21 +175,18 @@ def model_pcollection(argv): my_options = pipeline_options.view_as(MyOptions) # [START model_pcollection] - p = beam.Pipeline(options=pipeline_options) + with beam.Pipeline(options=pipeline_options) as p: - lines = (p - | beam.Create([ - 'To be, or not to be: that is the question: ', - 'Whether \'tis nobler in the mind to suffer ', - 'The slings and arrows of outrageous fortune, ', - 'Or to take arms against a sea of troubles, '])) - # [END model_pcollection] + lines = (p + | beam.Create([ + 'To be, or not to be: that is the question: ', + 'Whether \'tis nobler in the mind to suffer ', + 'The slings and arrows of outrageous fortune, ', + 'Or to take arms against a sea of troubles, '])) + # [END model_pcollection] - (lines - | beam.io.WriteToText(my_options.output)) - - result = p.run() - result.wait_until_finish() + (lines + | beam.io.WriteToText(my_options.output)) def pipeline_options_remote(argv): @@ -297,12 +291,10 @@ def pipeline_options_command_line(argv): known_args, pipeline_args = parser.parse_known_args(argv) # Create the Pipeline with remaining arguments. - p = beam.Pipeline(argv=pipeline_args) - lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input) - lines | 'WriteToText' >> beam.io.WriteToText(known_args.output) - # [END pipeline_options_command_line] - - p.run().wait_until_finish() + with beam.Pipeline(argv=pipeline_args) as p: + lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input) + lines | 'WriteToText' >> beam.io.WriteToText(known_args.output) + # [END pipeline_options_command_line] def pipeline_logging(lines, output): @@ -329,13 +321,11 @@ def pipeline_logging(lines, output): # Remaining WordCount example code ... # [END pipeline_logging] - p = TestPipeline() # Use TestPipeline for testing. - (p - | beam.Create(lines) - | beam.ParDo(ExtractWordsFn()) - | beam.io.WriteToText(output)) - - p.run() + with TestPipeline() as p: # Use TestPipeline for testing. + (p + | beam.Create(lines) + | beam.ParDo(ExtractWordsFn()) + | beam.io.WriteToText(output)) def pipeline_monitoring(renames): @@ -385,20 +375,19 @@ def pipeline_monitoring(renames): pipeline_options = PipelineOptions() options = pipeline_options.view_as(WordCountOptions) - p = TestPipeline() # Use TestPipeline for testing. + with TestPipeline() as p: # Use TestPipeline for testing. - # [START pipeline_monitoring_execution] - (p - # Read the lines of the input text. - | 'ReadLines' >> beam.io.ReadFromText(options.input) - # Count the words. - | CountWords() - # Write the formatted word counts to output. - | 'WriteCounts' >> beam.io.WriteToText(options.output)) - # [END pipeline_monitoring_execution] + # [START pipeline_monitoring_execution] + (p + # Read the lines of the input text. + | 'ReadLines' >> beam.io.ReadFromText(options.input) + # Count the words. + | CountWords() + # Write the formatted word counts to output. + | 'WriteCounts' >> beam.io.WriteToText(options.output)) + # [END pipeline_monitoring_execution] - p.visit(SnippetUtils.RenameFiles(renames)) - p.run() + p.visit(SnippetUtils.RenameFiles(renames)) def examples_wordcount_minimal(renames): @@ -478,40 +467,39 @@ def examples_wordcount_wordcount(renames): default='gs://my-bucket/input') options = PipelineOptions(argv) - p = beam.Pipeline(options=options) - # [END examples_wordcount_wordcount_options] + with beam.Pipeline(options=options) as p: + # [END examples_wordcount_wordcount_options] - lines = p | beam.io.ReadFromText( - 'gs://dataflow-samples/shakespeare/kinglear.txt') + lines = p | beam.io.ReadFromText( + 'gs://dataflow-samples/shakespeare/kinglear.txt') - # [START examples_wordcount_wordcount_composite] - class CountWords(beam.PTransform): + # [START examples_wordcount_wordcount_composite] + class CountWords(beam.PTransform): - def expand(self, pcoll): - return (pcoll - # Convert lines of text into individual words. - | 'ExtractWords' >> beam.FlatMap( - lambda x: re.findall(r'[A-Za-z\']+', x)) + def expand(self, pcoll): + return (pcoll + # Convert lines of text into individual words. + | 'ExtractWords' >> beam.FlatMap( + lambda x: re.findall(r'[A-Za-z\']+', x)) - # Count the number of times each word occurs. - | beam.combiners.Count.PerElement()) + # Count the number of times each word occurs. + | beam.combiners.Count.PerElement()) - counts = lines | CountWords() - # [END examples_wordcount_wordcount_composite] + counts = lines | CountWords() + # [END examples_wordcount_wordcount_composite] - # [START examples_wordcount_wordcount_dofn] - class FormatAsTextFn(beam.DoFn): + # [START examples_wordcount_wordcount_dofn] + class FormatAsTextFn(beam.DoFn): - def process(self, element): - word, count = element - yield '%s: %s' % (word, count) + def process(self, element): + word, count = element + yield '%s: %s' % (word, count) - formatted = counts | beam.ParDo(FormatAsTextFn()) - # [END examples_wordcount_wordcount_dofn] + formatted = counts | beam.ParDo(FormatAsTextFn()) + # [END examples_wordcount_wordcount_dofn] - formatted | beam.io.WriteToText('gs://my-bucket/counts.txt') - p.visit(SnippetUtils.RenameFiles(renames)) - p.run().wait_until_finish() + formatted | beam.io.WriteToText('gs://my-bucket/counts.txt') + p.visit(SnippetUtils.RenameFiles(renames)) def examples_wordcount_debugging(renames): @@ -558,27 +546,27 @@ def examples_wordcount_debugging(renames): # [END example_wordcount_debugging_logging] # [END example_wordcount_debugging_aggregators] - p = TestPipeline() # Use TestPipeline for testing. - filtered_words = ( - p - | beam.io.ReadFromText( - 'gs://dataflow-samples/shakespeare/kinglear.txt') - | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) - | beam.combiners.Count.PerElement() - | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach'))) + with TestPipeline() as p: # Use TestPipeline for testing. + filtered_words = ( + p + | beam.io.ReadFromText( + 'gs://dataflow-samples/shakespeare/kinglear.txt') + | 'ExtractWords' >> beam.FlatMap( + lambda x: re.findall(r'[A-Za-z\']+', x)) + | beam.combiners.Count.PerElement() + | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach'))) - # [START example_wordcount_debugging_assert] - beam.testing.util.assert_that( - filtered_words, beam.testing.util.equal_to( - [('Flourish', 3), ('stomach', 1)])) - # [END example_wordcount_debugging_assert] + # [START example_wordcount_debugging_assert] + beam.testing.util.assert_that( + filtered_words, beam.testing.util.equal_to( + [('Flourish', 3), ('stomach', 1)])) + # [END example_wordcount_debugging_assert] - output = (filtered_words - | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) - | 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt')) + output = (filtered_words + | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) + | 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt')) - p.visit(SnippetUtils.RenameFiles(renames)) - p.run() + p.visit(SnippetUtils.RenameFiles(renames)) import apache_beam as beam @@ -659,16 +647,14 @@ def model_custom_source(count): # Using the source in an example pipeline. # [START model_custom_source_use_new_source] - p = beam.Pipeline(options=PipelineOptions()) - numbers = p | 'ProduceNumbers' >> beam.io.Read(CountingSource(count)) - # [END model_custom_source_use_new_source] + with beam.Pipeline(options=PipelineOptions()) as p: + numbers = p | 'ProduceNumbers' >> beam.io.Read(CountingSource(count)) + # [END model_custom_source_use_new_source] - lines = numbers | beam.core.Map(lambda number: 'line %d' % number) - assert_that( - lines, equal_to( - ['line ' + str(number) for number in range(0, count)])) - - p.run().wait_until_finish() + lines = numbers | beam.core.Map(lambda number: 'line %d' % number) + assert_that( + lines, equal_to( + ['line ' + str(number) for number in range(0, count)])) # We recommend users to start Source classes with an underscore to discourage # using the Source class directly when a PTransform for the source is @@ -796,14 +782,12 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform, # Using the new sink in an example pipeline. # [START model_custom_sink_use_new_sink] - p = beam.Pipeline(options=PipelineOptions()) - kvs = p | 'CreateKVs' >> beam.Create(KVs) + with beam.Pipeline(options=PipelineOptions()) as p: + kvs = p | 'CreateKVs' >> beam.Create(KVs) - kvs | 'WriteToSimpleKV' >> beam.io.Write( - SimpleKVSink('http://url_to_simple_kv/', final_table_name)) - # [END model_custom_sink_use_new_sink] - - p.run().wait_until_finish() + kvs | 'WriteToSimpleKV' >> beam.io.Write( + SimpleKVSink('http://url_to_simple_kv/', final_table_name)) + # [END model_custom_sink_use_new_sink] # We recommend users to start Sink class names with an underscore to # discourage using the Sink class directly when a PTransform for the sink is @@ -828,13 +812,11 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform, final_table_name = final_table_name_with_ptransform # [START model_custom_sink_use_ptransform] - p = beam.Pipeline(options=PipelineOptions()) - kvs = p | 'CreateKVs' >> beam.core.Create(KVs) - kvs | 'WriteToSimpleKV' >> WriteToKVSink( - 'http://url_to_simple_kv/', final_table_name) - # [END model_custom_sink_use_ptransform] - - p.run().wait_until_finish() + with beam.Pipeline(options=PipelineOptions()) as p: + kvs = p | 'CreateKVs' >> beam.core.Create(KVs) + kvs | 'WriteToSimpleKV' >> WriteToKVSink( + 'http://url_to_simple_kv/', final_table_name) + # [END model_custom_sink_use_ptransform] def model_textio(renames): @@ -847,37 +829,35 @@ def model_textio(renames): from apache_beam.options.pipeline_options import PipelineOptions # [START model_textio_read] - p = beam.Pipeline(options=PipelineOptions()) - # [START model_pipelineio_read] - lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv') - # [END model_pipelineio_read] - # [END model_textio_read] - - # [START model_textio_write] - filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words) - # [START model_pipelineio_write] - filtered_words | 'WriteToText' >> beam.io.WriteToText( - '/path/to/numbers', file_name_suffix='.csv') - # [END model_pipelineio_write] - # [END model_textio_write] + with beam.Pipeline(options=PipelineOptions()) as p: + # [START model_pipelineio_read] + lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv') + # [END model_pipelineio_read] + # [END model_textio_read] - p.visit(SnippetUtils.RenameFiles(renames)) - p.run().wait_until_finish() + # [START model_textio_write] + filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words) + # [START model_pipelineio_write] + filtered_words | 'WriteToText' >> beam.io.WriteToText( + '/path/to/numbers', file_name_suffix='.csv') + # [END model_pipelineio_write] + # [END model_textio_write] + + p.visit(SnippetUtils.RenameFiles(renames)) def model_textio_compressed(renames, expected): """Using a Read Transform to read compressed text files.""" - p = TestPipeline() + with TestPipeline() as p: - # [START model_textio_write_compressed] - lines = p | 'ReadFromText' >> beam.io.ReadFromText( - '/path/to/input-*.csv.gz', - compression_type=beam.io.filesystem.CompressionTypes.GZIP) - # [END model_textio_write_compressed] + # [START model_textio_write_compressed] + lines = p | 'ReadFromText' >> beam.io.ReadFromText( + '/path/to/input-*.csv.gz', + compression_type=beam.io.filesystem.CompressionTypes.GZIP) + # [END model_textio_write_compressed] - assert_that(lines, equal_to(expected)) - p.visit(SnippetUtils.RenameFiles(renames)) - p.run().wait_until_finish() + assert_that(lines, equal_to(expected)) + p.visit(SnippetUtils.RenameFiles(renames)) def model_datastoreio(): @@ -987,43 +967,40 @@ def model_composite_transform_example(contents, output_path): # [END composite_ptransform_apply_method] # [END composite_transform_example] - p = TestPipeline() # Use TestPipeline for testing. - (p - | beam.Create(contents) - | CountWords() - | beam.io.WriteToText(output_path)) - p.run() + with TestPipeline() as p: # Use TestPipeline for testing. + (p + | beam.Create(contents) + | CountWords() + | beam.io.WriteToText(output_path)) def model_multiple_pcollections_flatten(contents, output_path): """Merging a PCollection with Flatten.""" some_hash_fn = lambda s: ord(s[0]) - import apache_beam as beam - p = TestPipeline() # Use TestPipeline for testing. partition_fn = lambda element, partitions: some_hash_fn(element) % partitions - - # Partition into deciles - partitioned = p | beam.Create(contents) | beam.Partition(partition_fn, 3) - pcoll1 = partitioned[0] - pcoll2 = partitioned[1] - pcoll3 = partitioned[2] - - # Flatten them back into 1 - - # A collection of PCollection objects can be represented simply - # as a tuple (or list) of PCollections. - # (The SDK for Python has no separate type to store multiple - # PCollection objects, whether containing the same or different - # types.) - # [START model_multiple_pcollections_flatten] - merged = ( - (pcoll1, pcoll2, pcoll3) - # A list of tuples can be "piped" directly into a Flatten transform. - | beam.Flatten()) - # [END model_multiple_pcollections_flatten] - merged | beam.io.WriteToText(output_path) - - p.run() + import apache_beam as beam + with TestPipeline() as p: # Use TestPipeline for testing. + + # Partition into deciles + partitioned = p | beam.Create(contents) | beam.Partition(partition_fn, 3) + pcoll1 = partitioned[0] + pcoll2 = partitioned[1] + pcoll3 = partitioned[2] + + # Flatten them back into 1 + + # A collection of PCollection objects can be represented simply + # as a tuple (or list) of PCollections. + # (The SDK for Python has no separate type to store multiple + # PCollection objects, whether containing the same or different + # types.) + # [START model_multiple_pcollections_flatten] + merged = ( + (pcoll1, pcoll2, pcoll3) + # A list of tuples can be "piped" directly into a Flatten transform. + | beam.Flatten()) + # [END model_multiple_pcollections_flatten] + merged | beam.io.WriteToText(output_path) def model_multiple_pcollections_partition(contents, output_path): @@ -1034,25 +1011,23 @@ def model_multiple_pcollections_partition(contents, output_path): """Assume i in [0,100).""" return i import apache_beam as beam - p = TestPipeline() # Use TestPipeline for testing. + with TestPipeline() as p: # Use TestPipeline for testing. - students = p | beam.Create(contents) + students = p | beam.Create(contents) - # [START model_multiple_pcollections_partition] - def partition_fn(student, num_partitions): - return int(get_percentile(student) * num_partitions / 100) + # [START model_multiple_pcollections_partition] + def partition_fn(student, num_partitions): + return int(get_percentile(student) * num_partitions / 100) - by_decile = students | beam.Partition(partition_fn, 10) - # [END model_multiple_pcollections_partition] - # [START model_multiple_pcollections_partition_40th] - fortieth_percentile = by_decile[4] - # [END model_multiple_pcollections_partition_40th] + by_decile = students | beam.Partition(partition_fn, 10) + # [END model_multiple_pcollections_partition] + # [START model_multiple_pcollections_partition_40th] + fortieth_percentile = by_decile[4] + # [END model_multiple_pcollections_partition_40th] - ([by_decile[d] for d in xrange(10) if d != 4] + [fortieth_percentile] - | beam.Flatten() - | beam.io.WriteToText(output_path)) - - p.run() + ([by_decile[d] for d in xrange(10) if d != 4] + [fortieth_percentile] + | beam.Flatten() + | beam.io.WriteToText(output_path)) def model_group_by_key(contents, output_path): @@ -1060,58 +1035,56 @@ def model_group_by_key(contents, output_path): import re import apache_beam as beam - p = TestPipeline() # Use TestPipeline for testing. - words_and_counts = ( - p - | beam.Create(contents) - | beam.FlatMap(lambda x: re.findall(r'\w+', x)) - | 'one word' >> beam.Map(lambda w: (w, 1))) - # GroupByKey accepts a PCollection of (w, 1) and - # outputs a PCollection of (w, (1, 1, ...)). - # (A key/value pair is just a tuple in Python.) - # This is a somewhat forced example, since one could - # simply use beam.combiners.Count.PerElement here. - # [START model_group_by_key_transform] - grouped_words = words_and_counts | beam.GroupByKey() - # [END model_group_by_key_transform] - (grouped_words - | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts))) - | beam.io.WriteToText(output_path)) - p.run() + with TestPipeline() as p: # Use TestPipeline for testing. + words_and_counts = ( + p + | beam.Create(contents) + | beam.FlatMap(lambda x: re.findall(r'\w+', x)) + | 'one word' >> beam.Map(lambda w: (w, 1))) + # GroupByKey accepts a PCollection of (w, 1) and + # outputs a PCollection of (w, (1, 1, ...)). + # (A key/value pair is just a tuple in Python.) + # This is a somewhat forced example, since one could + # simply use beam.combiners.Count.PerElement here. + # [START model_group_by_key_transform] + grouped_words = words_and_counts | beam.GroupByKey() + # [END model_group_by_key_transform] + (grouped_words + | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts))) + | beam.io.WriteToText(output_path)) def model_co_group_by_key_tuple(email_list, phone_list, output_path): """Applying a CoGroupByKey Transform to a tuple.""" import apache_beam as beam - p = TestPipeline() # Use TestPipeline for testing. - # [START model_group_by_key_cogroupbykey_tuple] - # Each data set is represented by key-value pairs in separate PCollections. - # Both data sets share a common key type (in this example str). - # The email_list contains values such as: ('joe', '[email protected]') with - # multiple possible values for each key. - # The phone_list contains values such as: ('mary': '111-222-3333') with - # multiple possible values for each key. - emails = p | 'email' >> beam.Create(email_list) - phones = p | 'phone' >> beam.Create(phone_list) - # The result PCollection contains one key-value element for each key in the - # input PCollections. The key of the pair will be the key from the input and - # the value will be a dictionary with two entries: 'emails' - an iterable of - # all values for the current key in the emails PCollection and 'phones': an - # iterable of all values for the current key in the phones PCollection. - # For instance, if 'emails' contained ('joe', '[email protected]') and - # ('joe', '[email protected]'), then 'result' will contain the element - # ('joe', {'emails': ['[email protected]', '[email protected]'], 'phones': ...}) - result = {'emails': emails, 'phones': phones} | beam.CoGroupByKey() - - def join_info((name, info)): - return '; '.join(['%s' % name, - '%s' % ','.join(info['emails']), - '%s' % ','.join(info['phones'])]) - - contact_lines = result | beam.Map(join_info) - # [END model_group_by_key_cogroupbykey_tuple] - contact_lines | beam.io.WriteToText(output_path) - p.run() + with TestPipeline() as p: # Use TestPipeline for testing. + # [START model_group_by_key_cogroupbykey_tuple] + # Each data set is represented by key-value pairs in separate PCollections. + # Both data sets share a common key type (in this example str). + # The email_list contains values such as: ('joe', '[email protected]') with + # multiple possible values for each key. + # The phone_list contains values such as: ('mary': '111-222-3333') with + # multiple possible values for each key. + emails = p | 'email' >> beam.Create(email_list) + phones = p | 'phone' >> beam.Create(phone_list) + # The result PCollection contains one key-value element for each key in the + # input PCollections. The key of the pair will be the key from the input and + # the value will be a dictionary with two entries: 'emails' - an iterable of + # all values for the current key in the emails PCollection and 'phones': an + # iterable of all values for the current key in the phones PCollection. + # For instance, if 'emails' contained ('joe', '[email protected]') and + # ('joe', '[email protected]'), then 'result' will contain the element + # ('joe', {'emails': ['[email protected]', '[email protected]'], 'phones': ...}) + result = {'emails': emails, 'phones': phones} | beam.CoGroupByKey() + + def join_info((name, info)): + return '; '.join(['%s' % name, + '%s' % ','.join(info['emails']), + '%s' % ','.join(info['phones'])]) + + contact_lines = result | beam.Map(join_info) + # [END model_group_by_key_cogroupbykey_tuple] + contact_lines | beam.io.WriteToText(output_path) def model_join_using_side_inputs( @@ -1121,35 +1094,34 @@ def model_join_using_side_inputs( import apache_beam as beam from apache_beam.pvalue import AsIter - p = TestPipeline() # Use TestPipeline for testing. - # [START model_join_using_side_inputs] - # This code performs a join by receiving the set of names as an input and - # passing PCollections that contain emails and phone numbers as side inputs - # instead of using CoGroupByKey. - names = p | 'names' >> beam.Create(name_list) - emails = p | 'email' >> beam.Create(email_list) - phones = p | 'phone' >> beam.Create(phone_list) - - def join_info(name, emails, phone_numbers): - filtered_emails = [] - for name_in_list, email in emails: - if name_in_list == name: - filtered_emails.append(email) - - filtered_phone_numbers = [] - for name_in_list, phone_number in phone_numbers: - if name_in_list == name: - filtered_phone_numbers.append(phone_number) - - return '; '.join(['%s' % name, - '%s' % ','.join(filtered_emails), - '%s' % ','.join(filtered_phone_numbers)]) - - contact_lines = names | 'CreateContacts' >> beam.core.Map( - join_info, AsIter(emails), AsIter(phones)) - # [END model_join_using_side_inputs] - contact_lines | beam.io.WriteToText(output_path) - p.run() + with TestPipeline() as p: # Use TestPipeline for testing. + # [START model_join_using_side_inputs] + # This code performs a join by receiving the set of names as an input and + # passing PCollections that contain emails and phone numbers as side inputs + # instead of using CoGroupByKey. + names = p | 'names' >> beam.Create(name_list) + emails = p | 'email' >> beam.Create(email_list) + phones = p | 'phone' >> beam.Create(phone_list) + + def join_info(name, emails, phone_numbers): + filtered_emails = [] + for name_in_list, email in emails: + if name_in_list == name: + filtered_emails.append(email) + + filtered_phone_numbers = [] + for name_in_list, phone_number in phone_numbers: + if name_in_list == name: + filtered_phone_numbers.append(phone_number) + + return '; '.join(['%s' % name, + '%s' % ','.join(filtered_emails), + '%s' % ','.join(filtered_phone_numbers)]) + + contact_lines = names | 'CreateContacts' >> beam.core.Map( + join_info, AsIter(emails), AsIter(phones)) + # [END model_join_using_side_inputs] + contact_lines | beam.io.WriteToText(output_path) # [START model_library_transforms_keys]
