Changed tests in examples/ and io/ to use TestPipeline. Removed wait_until_finish except for a few exceptions: - tfidf: as an example usage. - some examples in cookbook - they run examples directly and, did not want to update the examples to use TestPipeline. - some snippets - if the pipeline creations is part of the snippet and it was not easy to override.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/703c1bc1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/703c1bc1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/703c1bc1 Branch: refs/heads/python-sdk Commit: 703c1bc18ac28aa1aa75a9359c1bcc4fbdd28f35 Parents: 2e49f51 Author: Ahmet Altay <[email protected]> Authored: Sat Jan 14 23:49:25 2017 -0800 Committer: Robert Bradshaw <[email protected]> Committed: Wed Jan 18 09:55:37 2017 -0800 ---------------------------------------------------------------------- .../examples/complete/autocomplete_test.py | 3 +- .../examples/complete/estimate_pi_test.py | 4 +- .../complete/juliaset/juliaset/juliaset.py | 2 +- .../complete/juliaset/juliaset/juliaset_test.py | 2 +- .../examples/complete/juliaset/juliaset_main.py | 2 +- .../apache_beam/examples/complete/tfidf.py | 3 +- .../apache_beam/examples/complete/tfidf_test.py | 3 +- .../examples/complete/top_wikipedia_sessions.py | 2 +- .../complete/top_wikipedia_sessions_test.py | 3 +- .../cookbook/bigquery_side_input_test.py | 3 +- .../cookbook/bigquery_tornadoes_test.py | 2 +- .../apache_beam/examples/cookbook/bigshuffle.py | 2 +- .../examples/cookbook/bigshuffle_test.py | 2 +- .../examples/cookbook/coders_test.py | 3 +- .../examples/cookbook/combiners_test.py | 5 +- .../examples/cookbook/custom_ptransform.py | 4 +- .../examples/cookbook/custom_ptransform_test.py | 3 +- .../examples/cookbook/datastore_wordcount.py | 7 ++- .../examples/cookbook/filters_test.py | 3 +- .../examples/cookbook/group_with_coder.py | 2 +- .../examples/cookbook/group_with_coder_test.py | 4 +- .../examples/cookbook/mergecontacts.py | 2 +- .../examples/cookbook/mergecontacts_test.py | 3 +- .../examples/cookbook/multiple_output_pardo.py | 4 +- .../cookbook/multiple_output_pardo_test.py | 2 +- .../apache_beam/examples/snippets/snippets.py | 65 ++++++++++---------- .../examples/snippets/snippets_test.py | 12 ++-- .../apache_beam/examples/streaming_wordcap.py | 2 +- .../apache_beam/examples/streaming_wordcount.py | 2 +- sdks/python/apache_beam/examples/wordcount.py | 1 + .../apache_beam/examples/wordcount_debugging.py | 4 +- .../apache_beam/examples/wordcount_minimal.py | 4 +- .../python/apache_beam/io/concat_source_test.py | 3 +- .../apache_beam/io/filebasedsource_test.py | 19 +++--- sdks/python/apache_beam/io/fileio_test.py | 11 ++-- sdks/python/apache_beam/io/sources_test.py | 3 +- sdks/python/apache_beam/io/textio_test.py | 26 ++++---- sdks/python/tox.ini | 1 + 38 files changed, 123 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/autocomplete_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py index 5ed4fb5..edf95f0 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py @@ -21,6 +21,7 @@ import unittest import apache_beam as beam from apache_beam.examples.complete import autocomplete +from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to @@ -30,7 +31,7 @@ class AutocompleteTest(unittest.TestCase): WORDS = ['this', 'this', 'that', 'to', 'to', 'to'] def test_top_prefixes(self): - p = beam.Pipeline('DirectRunner') + p = TestPipeline() words = p | beam.Create(self.WORDS) result = words | autocomplete.TopPerPrefix(5) # values must be hashable for now http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/estimate_pi_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py index 0440ecc..10010cb 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py @@ -20,8 +20,8 @@ import logging import unittest -import apache_beam as beam from apache_beam.examples.complete import estimate_pi +from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import DataflowAssertException @@ -38,7 +38,7 @@ def in_between(lower, upper): class EstimatePiTest(unittest.TestCase): def test_basics(self): - p = beam.Pipeline('DirectRunner') + p = TestPipeline() result = p | 'Estimate' >> estimate_pi.EstimatePiTransform() # Note: Probabilistically speaking this test can fail with a probability http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py index 45fc1fb..30883dc 100644 --- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py +++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py @@ -113,7 +113,7 @@ def run(argv=None): # pylint: disable=missing-docstring lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords)) | WriteToText(known_args.coordinate_output)) # pylint: enable=expression-not-assigned - p.run() + return p.run() # Optionally render the image and save it to a file. # TODO(silviuc): Add this functionality. http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py index c13e857..c254eb4 100644 --- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py +++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py @@ -52,7 +52,7 @@ class JuliaSetTest(unittest.TestCase): if image_file_name is not None: args.append('--image_output=%s' % image_file_name) - juliaset.run(args) + juliaset.run(args).wait_until_finish() def test_output_file_format(self): grid_size = 5 http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py index d6ba064..0db5431 100644 --- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py +++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset_main.py @@ -38,7 +38,7 @@ an example: python juliaset_main.py \ --job_name juliaset-$USER \ --project YOUR-PROJECT \ - --runner BlockingDataflowRunner \ + --runner DataflowRunner \ --setup_file ./setup.py \ --staging_location gs://YOUR-BUCKET/juliaset/staging \ --temp_location gs://YOUR-BUCKET/juliaset/temp \ http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/tfidf.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py index 59b9d6f..4d6e0d3 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf.py +++ b/sdks/python/apache_beam/examples/complete/tfidf.py @@ -200,7 +200,8 @@ def run(argv=None): # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned output | 'write' >> WriteToText(known_args.output) - p.run() + # Execute the pipeline and wait until it is completed. + p.run().wait_until_finish() if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/tfidf_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py index deda4cb..404ab44 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf_test.py +++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py @@ -25,6 +25,7 @@ import unittest import apache_beam as beam from apache_beam.examples.complete import tfidf +from apache_beam.test_pipeline import TestPipeline EXPECTED_RESULTS = set([ @@ -47,7 +48,7 @@ class TfIdfTest(unittest.TestCase): f.write(contents) def test_tfidf_transform(self): - p = beam.Pipeline('DirectRunner') + p = TestPipeline() uri_to_line = p | beam.Create( 'create sample', [('1.txt', 'abc def ghi'), http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py index 2dea642..4920813 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py @@ -31,7 +31,7 @@ pipeline configuration in addition to the above: --project YOUR_PROJECT_ID --staging_location gs://YOUR_STAGING_DIRECTORY --temp_location gs://YOUR_TEMPORARY_DIRECTORY - --runner BlockingDataflowRunner + --runner DataflowRunner The default input is gs://dataflow-samples/wikipedia_edits/*.json and can be overridden with --input. http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py index 1d25807..9b9d9b1 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py @@ -23,6 +23,7 @@ import unittest import apache_beam as beam from apache_beam.examples.complete import top_wikipedia_sessions +from apache_beam.test_pipeline import TestPipeline class ComputeTopSessionsTest(unittest.TestCase): @@ -49,7 +50,7 @@ class ComputeTopSessionsTest(unittest.TestCase): ] def test_compute_top_sessions(self): - p = beam.Pipeline('DirectRunner') + p = TestPipeline() edits = p | beam.Create(self.EDITS) result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0) http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py index 97c41d6..926f141 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py @@ -22,12 +22,13 @@ import unittest import apache_beam as beam from apache_beam.examples.cookbook import bigquery_side_input +from apache_beam.test_pipeline import TestPipeline class BigQuerySideInputTest(unittest.TestCase): def test_create_groups(self): - p = beam.Pipeline('DirectRunner') + p = TestPipeline() group_ids_pcoll = p | 'create_group_ids' >> beam.Create(['A', 'B', 'C']) corpus_pcoll = p | beam.Create('create_corpus', http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py index 2cb2c45..0fabe3f 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py @@ -36,7 +36,7 @@ class BigQueryTornadoesTest(unittest.TestCase): results = bigquery_tornadoes.count_tornadoes(rows) beam.assert_that(results, beam.equal_to([{'month': 1, 'tornado_count': 2}, {'month': 2, 'tornado_count': 1}])) - p.run() + p.run().wait_until_finish() if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/bigshuffle.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py index ceeefd6..b5eacce 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py +++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py @@ -87,7 +87,7 @@ def run(argv=None): known_args.checksum_output + '-output') # Actually run the pipeline (all operations above are deferred). - p.run() + return p.run() if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py index d73c976..60b6acc 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle_test.py @@ -38,7 +38,7 @@ class BigShuffleTest(unittest.TestCase): bigshuffle.run([ '--input=%s*' % temp_path, '--output=%s.result' % temp_path, - '--checksum_output=%s.checksum' % temp_path]) + '--checksum_output=%s.checksum' % temp_path]).wait_until_finish() # Parse result file and compare. results = [] with open(temp_path + '.result-00000-of-00001') as result_file: http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/coders_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py index ec0848f..4a92abb 100644 --- a/sdks/python/apache_beam/examples/cookbook/coders_test.py +++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py @@ -22,6 +22,7 @@ import unittest import apache_beam as beam from apache_beam.examples.cookbook import coders +from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to @@ -34,7 +35,7 @@ class CodersTest(unittest.TestCase): {'host': ['Brasil', 1], 'guest': ['Italy', 0]}] def test_compute_points(self): - p = beam.Pipeline('DirectRunner') + p = TestPipeline() records = p | 'create' >> beam.Create(self.SAMPLE_RECORDS) result = (records | 'points' >> beam.FlatMap(coders.compute_points) http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/combiners_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/combiners_test.py b/sdks/python/apache_beam/examples/cookbook/combiners_test.py index 18bd3bc..a8ed555 100644 --- a/sdks/python/apache_beam/examples/cookbook/combiners_test.py +++ b/sdks/python/apache_beam/examples/cookbook/combiners_test.py @@ -27,6 +27,7 @@ import logging import unittest import apache_beam as beam +from apache_beam.test_pipeline import TestPipeline class CombinersTest(unittest.TestCase): @@ -44,7 +45,7 @@ class CombinersTest(unittest.TestCase): can be used. """ result = ( - beam.Pipeline(runner=beam.runners.DirectRunner()) + TestPipeline() | beam.Create(CombinersTest.SAMPLE_DATA) | beam.CombinePerKey(sum)) @@ -60,7 +61,7 @@ class CombinersTest(unittest.TestCase): return result result = ( - beam.Pipeline(runner=beam.runners.DirectRunner()) + TestPipeline() | beam.Create(CombinersTest.SAMPLE_DATA) | beam.CombinePerKey(multiply)) http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py index cfbb99d..67d1ff8 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py @@ -51,7 +51,7 @@ def run_count1(known_args, options): (p | beam.io.ReadFromText(known_args.input) | Count1() | beam.io.WriteToText(known_args.output)) - p.run() + p.run().wait_until_finish() @beam.ptransform_fn @@ -70,7 +70,7 @@ def run_count2(known_args, options): (p | ReadFromText(known_args.input) | Count2() # pylint: disable=no-value-for-parameter | WriteToText(known_args.output)) - p.run() + p.run().wait_until_finish() @beam.ptransform_fn http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py index 91309ae..cd1c04a 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py @@ -22,6 +22,7 @@ import unittest import apache_beam as beam from apache_beam.examples.cookbook import custom_ptransform +from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to @@ -39,7 +40,7 @@ class CustomCountTest(unittest.TestCase): self.run_pipeline(custom_ptransform.Count3(factor), factor=factor) def run_pipeline(self, count_implementation, factor=1): - p = beam.Pipeline('DirectRunner') + p = TestPipeline() words = p | beam.Create(['CAT', 'DOG', 'CAT', 'CAT', 'DOG']) result = words | count_implementation assert_that( http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py index 8f68fb4..25abb3e 100644 --- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -141,7 +141,7 @@ def write_to_datastore(project, user_options, pipeline_options): | 'write to datastore' >> WriteToDatastore(project)) # Actually run the pipeline (all operations above are deferred). - p.run() + p.run().wait_until_finish() def make_ancestor_query(kind, namespace, ancestor): @@ -192,7 +192,10 @@ def read_from_datastore(project, user_options, pipeline_options): num_shards=user_options.num_shards) # Actually run the pipeline (all operations above are deferred). - return p.run() + result = p.run() + # Wait until completion, main thread would access post-completion job results. + result.wait_until_finish() + return result def run(argv=None): http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/filters_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py index 81dd30f..28bb1e1 100644 --- a/sdks/python/apache_beam/examples/cookbook/filters_test.py +++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py @@ -22,6 +22,7 @@ import unittest import apache_beam as beam from apache_beam.examples.cookbook import filters +from apache_beam.test_pipeline import TestPipeline class FiltersTest(unittest.TestCase): @@ -35,7 +36,7 @@ class FiltersTest(unittest.TestCase): ] def _get_result_for_month(self, month): - p = beam.Pipeline('DirectRunner') + p = TestPipeline() rows = (p | 'create' >> beam.Create(self.input_data)) results = filters.filter_cold_days(rows, month) http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/group_with_coder.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py index 78540d1..f6f2108 100644 --- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py +++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py @@ -114,7 +114,7 @@ def run(argv=sys.argv[1:]): | beam.CombinePerKey(sum) | beam.Map(lambda (k, v): '%s,%d' % (k.name, v)) | WriteToText(known_args.output)) - p.run() + return p.run() if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py index 268ba8d..4e87966 100644 --- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py +++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py @@ -50,7 +50,7 @@ class GroupWithCoderTest(unittest.TestCase): temp_path = self.create_temp_file(self.SAMPLE_RECORDS) group_with_coder.run([ '--input=%s*' % temp_path, - '--output=%s.result' % temp_path]) + '--output=%s.result' % temp_path]).wait_until_finish() # Parse result file and compare. results = [] with open(temp_path + '.result-00000-of-00001') as result_file: @@ -71,7 +71,7 @@ class GroupWithCoderTest(unittest.TestCase): group_with_coder.run([ '--no_pipeline_type_check', '--input=%s*' % temp_path, - '--output=%s.result' % temp_path]) + '--output=%s.result' % temp_path]).wait_until_finish() # Parse result file and compare. results = [] with open(temp_path + '.result-00000-of-00001') as result_file: http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/mergecontacts.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py index 2475e02..6906ae4 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py @@ -124,7 +124,7 @@ def run(argv=None, assert_results=None): beam.assert_that(num_nomads, beam.equal_to([expected_nomads]), label='assert:nomads') # Execute pipeline. - p.run() + return p.run() if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py index b3be0dd..09f71d3 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py @@ -107,12 +107,13 @@ class MergeContactsTest(unittest.TestCase): result_prefix = self.create_temp_file('') - mergecontacts.run([ + result = mergecontacts.run([ '--input_email=%s' % path_email, '--input_phone=%s' % path_phone, '--input_snailmail=%s' % path_snailmail, '--output_tsv=%s.tsv' % result_prefix, '--output_stats=%s.stats' % result_prefix], assert_results=(2, 1, 3)) + result.wait_until_finish() with open('%s.tsv-00000-of-00001' % result_prefix) as f: contents = f.read() http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py index 3acebc6..a877a1d 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py @@ -40,7 +40,7 @@ pipeline configuration: --staging_location gs://YOUR_STAGING_DIRECTORY --temp_location gs://YOUR_TEMP_DIRECTORY --job_name YOUR_JOB_NAME - --runner BlockingDataflowRunner + --runner DataflowRunner and an output prefix on GCS: --output gs://YOUR_OUTPUT_PREFIX @@ -173,7 +173,7 @@ def run(argv=None): | 'count words' >> CountWords() | 'write words' >> WriteToText(known_args.output + '-words')) - p.run() + return p.run() if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 3ddd668..2c9111c 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -52,7 +52,7 @@ class MultipleOutputParDo(unittest.TestCase): multiple_output_pardo.run([ '--input=%s*' % temp_path, - '--output=%s' % result_prefix]) + '--output=%s' % result_prefix]).wait_until_finish() expected_char_count = len(''.join(self.SAMPLE_TEXT.split('\n'))) with open(result_prefix + '-chars-00000-of-00001') as f: http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 2eadc44..0cba1af 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -31,6 +31,7 @@ string. The tags can contain only letters, digits and _. """ import apache_beam as beam +from apache_beam.test_pipeline import TestPipeline # Quiet some pylint warnings that happen because of the somewhat special # format for the code snippets. @@ -88,6 +89,8 @@ def construct_pipeline(renames): p = beam.Pipeline(options=PipelineOptions()) # [END pipelines_constructing_creating] + p = TestPipeline() # Use TestPipeline for testing. + # [START pipelines_constructing_reading] lines = p | 'ReadMyFile' >> beam.io.ReadFromText('gs://some/inputData.txt') # [END pipelines_constructing_reading] @@ -106,8 +109,9 @@ def construct_pipeline(renames): p.visit(SnippetUtils.RenameFiles(renames)) # [START pipelines_constructing_running] - p.run() + result = p.run() # [END pipelines_constructing_running] + result def model_pipelines(argv): @@ -144,8 +148,9 @@ def model_pipelines(argv): | beam.combiners.Count.PerKey() | beam.io.WriteToText(my_options.output)) - p.run() + result = p.run() # [END model_pipelines] + result.wait_until_finish() def model_pcollection(argv): @@ -175,8 +180,9 @@ def model_pcollection(argv): 'Or to take arms against a sea of troubles, ']) | beam.io.WriteToText(my_options.output)) - p.run() + result = p.run() # [END model_pcollection] + result.wait_until_finish() def pipeline_options_remote(argv): @@ -206,8 +212,7 @@ def pipeline_options_remote(argv): options = PipelineOptions(flags=argv) # For Cloud execution, set the Cloud Platform project, job_name, - # staging location, temp_location and specify DataflowRunner or - # BlockingDataflowRunner. + # staging location, temp_location and specify DataflowRunner. google_cloud_options = options.view_as(GoogleCloudOptions) google_cloud_options.project = 'my-project-id' google_cloud_options.job_name = 'myjob' @@ -223,9 +228,7 @@ def pipeline_options_remote(argv): my_input = my_options.input my_output = my_options.output - # Overriding the runner for tests. - options.view_as(StandardOptions).runner = 'DirectRunner' - p = Pipeline(options=options) + p = TestPipeline() # Use TestPipeline for testing. lines = p | beam.io.ReadFromText(my_input) lines | beam.io.WriteToText(my_output) @@ -265,6 +268,7 @@ def pipeline_options_local(argv): p = Pipeline(options=options) # [END pipeline_options_local] + p = TestPipeline() # Use TestPipeline for testing. lines = p | beam.io.ReadFromText(my_input) lines | beam.io.WriteToText(my_output) p.run() @@ -288,7 +292,7 @@ def pipeline_options_command_line(argv): lines | 'WriteToText' >> beam.io.WriteToText(known_args.output) # [END pipeline_options_command_line] - p.run() + p.run().wait_until_finish() def pipeline_logging(lines, output): @@ -296,7 +300,6 @@ def pipeline_logging(lines, output): import re import apache_beam as beam - from apache_beam.utils.pipeline_options import PipelineOptions # [START pipeline_logging] # import Python logging module. @@ -316,7 +319,7 @@ def pipeline_logging(lines, output): # Remaining WordCount example code ... # [END pipeline_logging] - p = beam.Pipeline(options=PipelineOptions()) + p = TestPipeline() # Use TestPipeline for testing. (p | beam.Create(lines) | beam.ParDo(ExtractWordsFn()) @@ -372,7 +375,7 @@ def pipeline_monitoring(renames): pipeline_options = PipelineOptions() options = pipeline_options.view_as(WordCountOptions) - p = beam.Pipeline(options=pipeline_options) + p = TestPipeline() # Use TestPipeline for testing. # [START pipeline_monitoring_execution] (p @@ -405,7 +408,7 @@ def examples_wordcount_minimal(renames): google_cloud_options.job_name = 'myjob' google_cloud_options.staging_location = 'gs://your-bucket-name-here/staging' google_cloud_options.temp_location = 'gs://your-bucket-name-here/temp' - options.view_as(StandardOptions).runner = 'BlockingDataflowRunner' + options.view_as(StandardOptions).runner = 'DataflowRunner' # [END examples_wordcount_minimal_options] # Run it locally for testing. @@ -441,8 +444,9 @@ def examples_wordcount_minimal(renames): p.visit(SnippetUtils.RenameFiles(renames)) # [START examples_wordcount_minimal_run] - p.run() + result = p.run() # [END examples_wordcount_minimal_run] + result.wait_until_finish() def examples_wordcount_wordcount(renames): @@ -497,7 +501,7 @@ def examples_wordcount_wordcount(renames): formatted | beam.io.WriteToText('gs://my-bucket/counts.txt') p.visit(SnippetUtils.RenameFiles(renames)) - p.run() + p.run().wait_until_finish() def examples_wordcount_debugging(renames): @@ -505,7 +509,6 @@ def examples_wordcount_debugging(renames): import re import apache_beam as beam - from apache_beam.utils.pipeline_options import PipelineOptions # [START example_wordcount_debugging_logging] # [START example_wordcount_debugging_aggregators] @@ -546,7 +549,7 @@ def examples_wordcount_debugging(renames): # [END example_wordcount_debugging_logging] # [END example_wordcount_debugging_aggregators] - p = beam.Pipeline(options=PipelineOptions()) + p = TestPipeline() # Use TestPipeline for testing. filtered_words = ( p | beam.io.ReadFromText( @@ -649,7 +652,7 @@ def model_custom_source(count): lines, beam.equal_to( ['line ' + str(number) for number in range(0, count)])) - p.run() + p.run().wait_until_finish() # We recommend users to start Source classes with an underscore to discourage # using the Source class directly when a PTransform for the source is @@ -679,7 +682,7 @@ def model_custom_source(count): lines, beam.equal_to( ['line ' + str(number) for number in range(0, count)])) - p.run() + p.run().wait_until_finish() def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform, @@ -781,7 +784,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform, final_table_name)) # [END model_custom_sink_use_new_sink] - p.run() + p.run().wait_until_finish() # We recommend users to start Sink class names with an underscore to # discourage using the Sink class directly when a PTransform for the sink is @@ -812,7 +815,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform, 'http://url_to_simple_kv/', final_table_name) # [END model_custom_sink_use_ptransform] - p.run() + p.run().wait_until_finish() def model_textio(renames): @@ -841,7 +844,7 @@ def model_textio(renames): # [END model_textio_write] p.visit(SnippetUtils.RenameFiles(renames)) - p.run() + p.run().wait_until_finish() def model_datastoreio(): @@ -954,8 +957,7 @@ def model_composite_transform_example(contents, output_path): # [END composite_ptransform_apply_method] # [END composite_transform_example] - from apache_beam.utils.pipeline_options import PipelineOptions - p = beam.Pipeline(options=PipelineOptions()) + p = TestPipeline() # Use TestPipeline for testing. (p | beam.Create(contents) | CountWords() @@ -967,8 +969,7 @@ def model_multiple_pcollections_flatten(contents, output_path): """Merging a PCollection with Flatten.""" some_hash_fn = lambda s: ord(s[0]) import apache_beam as beam - from apache_beam.utils.pipeline_options import PipelineOptions - p = beam.Pipeline(options=PipelineOptions()) + p = TestPipeline() # Use TestPipeline for testing. partition_fn = lambda element, partitions: some_hash_fn(element) % partitions # Partition into deciles @@ -1005,8 +1006,7 @@ def model_multiple_pcollections_partition(contents, output_path): """Assume i in [0,100).""" return i import apache_beam as beam - from apache_beam.utils.pipeline_options import PipelineOptions - p = beam.Pipeline(options=PipelineOptions()) + p = TestPipeline() # Use TestPipeline for testing. students = p | beam.Create(contents) @@ -1032,8 +1032,7 @@ def model_group_by_key(contents, output_path): import re import apache_beam as beam - from apache_beam.utils.pipeline_options import PipelineOptions - p = beam.Pipeline(options=PipelineOptions()) + p = TestPipeline() # Use TestPipeline for testing. words_and_counts = ( p | beam.Create(contents) @@ -1056,8 +1055,7 @@ def model_group_by_key(contents, output_path): def model_co_group_by_key_tuple(email_list, phone_list, output_path): """Applying a CoGroupByKey Transform to a tuple.""" import apache_beam as beam - from apache_beam.utils.pipeline_options import PipelineOptions - p = beam.Pipeline(options=PipelineOptions()) + p = TestPipeline() # Use TestPipeline for testing. # [START model_group_by_key_cogroupbykey_tuple] # Each data set is represented by key-value pairs in separate PCollections. # Both data sets share a common key type (in this example str). @@ -1094,9 +1092,8 @@ def model_join_using_side_inputs( import apache_beam as beam from apache_beam.pvalue import AsIter - from apache_beam.utils.pipeline_options import PipelineOptions - p = beam.Pipeline(options=PipelineOptions()) + p = TestPipeline() # Use TestPipeline for testing. # [START model_join_using_side_inputs] # This code performs a join by receiving the set of names as an input and # passing PCollections that contain emails and phone numbers as side inputs http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index ffe0f58..34863f1 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -20,7 +20,6 @@ import glob import logging import os -import sys import tempfile import unittest import uuid @@ -35,6 +34,7 @@ from apache_beam.utils.pipeline_options import TypeOptions from apache_beam.examples.snippets import snippets # pylint: disable=expression-not-assigned +from apache_beam.test_pipeline import TestPipeline class ParDoTest(unittest.TestCase): @@ -110,7 +110,7 @@ class ParDoTest(unittest.TestCase): self.assertEqual({1, 2, 4}, set(result)) def test_pardo_side_input(self): - p = beam.Pipeline('DirectRunner') + p = TestPipeline() words = p | 'start' >> beam.Create(['a', 'bb', 'ccc', 'dddd']) # [START model_pardo_side_input] @@ -228,7 +228,7 @@ class ParDoTest(unittest.TestCase): class TypeHintsTest(unittest.TestCase): def test_bad_types(self): - p = beam.Pipeline('DirectRunner', argv=sys.argv) + p = TestPipeline() evens = None # pylint: disable=unused-variable # [START type_hints_missing_define_numbers] @@ -290,7 +290,7 @@ class TypeHintsTest(unittest.TestCase): def test_runtime_checks_off(self): # pylint: disable=expression-not-assigned - p = beam.Pipeline('DirectRunner', argv=sys.argv) + p = TestPipeline() # [START type_hints_runtime_off] p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str) p.run() @@ -298,7 +298,7 @@ class TypeHintsTest(unittest.TestCase): def test_runtime_checks_on(self): # pylint: disable=expression-not-assigned - p = beam.Pipeline('DirectRunner', argv=sys.argv) + p = TestPipeline() with self.assertRaises(typehints.TypeCheckError): # [START type_hints_runtime_on] p.options.view_as(TypeOptions).runtime_type_check = True @@ -307,7 +307,7 @@ class TypeHintsTest(unittest.TestCase): # [END type_hints_runtime_on] def test_deterministic_key(self): - p = beam.Pipeline('DirectRunner') + p = TestPipeline() lines = (p | beam.Create( ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3'])) http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/streaming_wordcap.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/streaming_wordcap.py b/sdks/python/apache_beam/examples/streaming_wordcap.py index d25ec3e..d0cc8a2 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcap.py +++ b/sdks/python/apache_beam/examples/streaming_wordcap.py @@ -56,7 +56,7 @@ def run(argv=None): transformed | beam.io.Write( beam.io.PubSubSink(known_args.output_topic)) - p.run() + p.run().wait_until_finish() if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/streaming_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py index 35c1abb..adfc33d 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount.py @@ -66,7 +66,7 @@ def run(argv=None): transformed | beam.io.Write( 'pubsub_write', beam.io.PubSubSink(known_args.output_topic)) - p.run() + p.run().wait_until_finish() if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index 211211d..92929af 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -97,6 +97,7 @@ def run(argv=None): # Actually run the pipeline (all operations above are deferred). result = p.run() + result.wait_until_finish() empty_line_values = result.aggregated_values(empty_line_aggregator) logging.info('number of empty lines: %d', sum(empty_line_values.values())) word_length_values = result.aggregated_values(average_word_size_aggregator) http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/wordcount_debugging.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index 20d1c2f..ac13f35 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -32,7 +32,7 @@ pipeline configuration:: --staging_location gs://YOUR_STAGING_DIRECTORY --temp_location gs://YOUR_TEMP_DIRECTORY --job_name YOUR_JOB_NAME - --runner BlockingDataflowRunner + --runner DataflowRunner and an output prefix on GCS:: @@ -151,7 +151,7 @@ def run(argv=None): | 'write' >> WriteToText(known_args.output)) # Actually run the pipeline (all operations above are deferred). - p.run() + p.run().wait_until_finish() if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/examples/wordcount_minimal.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py index 18595d0..b80ed84 100644 --- a/sdks/python/apache_beam/examples/wordcount_minimal.py +++ b/sdks/python/apache_beam/examples/wordcount_minimal.py @@ -73,7 +73,7 @@ def run(argv=None): help='Output file to write results to.') known_args, pipeline_args = parser.parse_known_args(argv) pipeline_args.extend([ - # CHANGE 2/5: (OPTIONAL) Change this to BlockingDataflowRunner to + # CHANGE 2/5: (OPTIONAL) Change this to DataflowRunner to # run your pipeline on the Google Cloud Dataflow Service. '--runner=DirectRunner', # CHANGE 3/5: Your project ID is required in order to run your pipeline on @@ -113,7 +113,7 @@ def run(argv=None): output | 'write' >> WriteToText(known_args.output) # Actually run the pipeline (all operations above are deferred). - p.run() + p.run().wait_until_finish() if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/concat_source_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py index 3ff2529..2cc4684 100644 --- a/sdks/python/apache_beam/io/concat_source_test.py +++ b/sdks/python/apache_beam/io/concat_source_test.py @@ -26,6 +26,7 @@ from apache_beam.io import iobase from apache_beam.io import range_trackers from apache_beam.io import source_test_utils from apache_beam.io.concat_source import ConcatSource +from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to @@ -212,7 +213,7 @@ class ConcatSourceTest(unittest.TestCase): RangeSource(10, 100), RangeSource(100, 1000), ]) - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | beam.Read(source) assert_that(pcoll, equal_to(range(1000))) http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/filebasedsource_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index 8f12627..7481c4c 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -38,6 +38,7 @@ from apache_beam.io.concat_source import ConcatSource from apache_beam.io.filebasedsource import _SingleFileSource as SingleFileSource from apache_beam.io.filebasedsource import FileBasedSource +from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.transforms.util import assert_that @@ -364,7 +365,7 @@ class TestFileBasedSource(unittest.TestCase): self.assertItemsEqual(expected_data, read_data) def _run_source_test(self, pattern, expected_data, splittable=True): - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> beam.Read(LineSource( pattern, splittable=splittable)) assert_that(pcoll, equal_to(expected_data)) @@ -404,7 +405,7 @@ class TestFileBasedSource(unittest.TestCase): with bz2.BZ2File(filename, 'wb') as f: f.write('\n'.join(lines)) - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> beam.Read(LineSource( filename, splittable=False, @@ -419,7 +420,7 @@ class TestFileBasedSource(unittest.TestCase): with gzip.GzipFile(filename, 'wb') as f: f.write('\n'.join(lines)) - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> beam.Read(LineSource( filename, splittable=False, @@ -437,7 +438,7 @@ class TestFileBasedSource(unittest.TestCase): compressed_chunks.append( compressobj.compress('\n'.join(c)) + compressobj.flush()) file_pattern = write_prepared_pattern(compressed_chunks) - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> beam.Read(LineSource( file_pattern, splittable=False, @@ -456,7 +457,7 @@ class TestFileBasedSource(unittest.TestCase): f.write('\n'.join(c)) compressed_chunks.append(out.getvalue()) file_pattern = write_prepared_pattern(compressed_chunks) - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> beam.Read(LineSource( file_pattern, splittable=False, @@ -471,7 +472,7 @@ class TestFileBasedSource(unittest.TestCase): with bz2.BZ2File(filename, 'wb') as f: f.write('\n'.join(lines)) - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> beam.Read(LineSource( filename, compression_type=fileio.CompressionTypes.AUTO)) @@ -485,7 +486,7 @@ class TestFileBasedSource(unittest.TestCase): with gzip.GzipFile(filename, 'wb') as f: f.write('\n'.join(lines)) - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> beam.Read(LineSource( filename, compression_type=fileio.CompressionTypes.AUTO)) @@ -504,7 +505,7 @@ class TestFileBasedSource(unittest.TestCase): compressed_chunks.append(out.getvalue()) file_pattern = write_prepared_pattern( compressed_chunks, suffixes=['.gz']*len(chunks)) - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> beam.Read(LineSource( file_pattern, compression_type=fileio.CompressionTypes.AUTO)) @@ -526,7 +527,7 @@ class TestFileBasedSource(unittest.TestCase): chunks_to_write.append('\n'.join(c)) file_pattern = write_prepared_pattern(chunks_to_write, suffixes=(['.gz', '']*3)) - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> beam.Read(LineSource( file_pattern, compression_type=fileio.CompressionTypes.AUTO)) http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 68e2bce..8842369 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -34,6 +34,7 @@ import apache_beam as beam from apache_beam import coders from apache_beam.io import fileio from apache_beam.runners.dataflow.native_io import iobase as dataflow_io +from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher @@ -760,7 +761,7 @@ class TestNativeTextFileSink(unittest.TestCase): self.assertEqual(f.read().splitlines(), []) def test_write_native(self): - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | beam.core.Create('Create', self.lines) pcoll | 'Write' >> beam.Write(fileio.NativeTextFileSink(self.path)) # pylint: disable=expression-not-assigned pipeline.run() @@ -773,7 +774,7 @@ class TestNativeTextFileSink(unittest.TestCase): self.assertEqual(read_result, self.lines) def test_write_native_auto_compression(self): - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | beam.core.Create('Create', self.lines) pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned fileio.NativeTextFileSink( @@ -788,7 +789,7 @@ class TestNativeTextFileSink(unittest.TestCase): self.assertEqual(read_result, self.lines) def test_write_native_auto_compression_unsharded(self): - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | beam.core.Create('Create', self.lines) pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned fileio.NativeTextFileSink( @@ -880,7 +881,7 @@ class TestFileSink(unittest.TestCase): temp_path = tempfile.NamedTemporaryFile().name sink = MyFileSink( temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder()) - p = beam.Pipeline('DirectRunner') + p = TestPipeline() p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned p.run() self.assertEqual( @@ -894,7 +895,7 @@ class TestFileSink(unittest.TestCase): num_shards=3, shard_name_template='_NN_SSS_', coder=coders.ToStringCoder()) - p = beam.Pipeline('DirectRunner') + p = TestPipeline() p | beam.Create(['a', 'b']) | beam.io.Write(sink) # pylint: disable=expression-not-assigned p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/sources_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py index f0f2046..dc0fd54 100644 --- a/sdks/python/apache_beam/io/sources_test.py +++ b/sdks/python/apache_beam/io/sources_test.py @@ -27,6 +27,7 @@ import apache_beam as beam from apache_beam import coders from apache_beam.io import iobase from apache_beam.io import range_trackers +from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to @@ -98,7 +99,7 @@ class SourcesTest(unittest.TestCase): def test_run_direct(self): file_name = self._create_temp_file('aaaa\nbbbb\ncccc\ndddd') - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | beam.Read(LineSource(file_name)) assert_that(pcoll, equal_to(['aaaa', 'bbbb', 'cccc', 'dddd'])) http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/apache_beam/io/textio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index 877e190..07ab4cc 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -43,6 +43,8 @@ from apache_beam.io.filebasedsource_test import write_data from apache_beam.io.filebasedsource_test import write_pattern from apache_beam.io.fileio import CompressionTypes +from apache_beam.test_pipeline import TestPipeline + from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher @@ -304,7 +306,7 @@ class TextSourceTest(unittest.TestCase): def test_dataflow_single_file(self): file_name, expected_data = write_data(5) assert len(expected_data) == 5 - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> ReadFromText(file_name) assert_that(pcoll, equal_to(expected_data)) pipeline.run() @@ -319,7 +321,7 @@ class TextSourceTest(unittest.TestCase): file_name, expected_data = write_data(5) assert len(expected_data) == 5 - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> ReadFromText(file_name, coder=DummyCoder()) assert_that(pcoll, equal_to([record * 2 for record in expected_data])) pipeline.run() @@ -327,7 +329,7 @@ class TextSourceTest(unittest.TestCase): def test_dataflow_file_pattern(self): pattern, expected_data = write_pattern([5, 3, 12, 8, 8, 4]) assert len(expected_data) == 40 - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> ReadFromText(pattern) assert_that(pcoll, equal_to(expected_data)) pipeline.run() @@ -339,7 +341,7 @@ class TextSourceTest(unittest.TestCase): with bz2.BZ2File(file_name, 'wb') as f: f.write('\n'.join(lines)) - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> ReadFromText(file_name) assert_that(pcoll, equal_to(lines)) pipeline.run() @@ -351,7 +353,7 @@ class TextSourceTest(unittest.TestCase): with gzip.GzipFile(file_name, 'wb') as f: f.write('\n'.join(lines)) - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> ReadFromText(file_name) assert_that(pcoll, equal_to(lines)) pipeline.run() @@ -363,7 +365,7 @@ class TextSourceTest(unittest.TestCase): with bz2.BZ2File(file_name, 'wb') as f: f.write('\n'.join(lines)) - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> ReadFromText( file_name, compression_type=CompressionTypes.BZIP2) @@ -377,7 +379,7 @@ class TextSourceTest(unittest.TestCase): with gzip.GzipFile(file_name, 'wb') as f: f.write('\n'.join(lines)) - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> ReadFromText( file_name, 0, CompressionTypes.GZIP, @@ -392,7 +394,7 @@ class TextSourceTest(unittest.TestCase): with gzip.GzipFile(file_name, 'wb') as f: f.write('\n'.join(lines)) - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> ReadFromText( file_name, 0, CompressionTypes.GZIP, @@ -425,7 +427,7 @@ class TextSourceTest(unittest.TestCase): def test_read_gzip_empty_file(self): filename = tempfile.NamedTemporaryFile( delete=False, prefix=tempfile.template).name - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | 'Read' >> ReadFromText( filename, 0, CompressionTypes.GZIP, @@ -521,7 +523,7 @@ class TextSinkTest(unittest.TestCase): hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) def test_write_dataflow(self): - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | beam.core.Create('Create', self.lines) pcoll | 'Write' >> WriteToText(self.path) # pylint: disable=expression-not-assigned pipeline.run() @@ -534,7 +536,7 @@ class TextSinkTest(unittest.TestCase): self.assertEqual(read_result, self.lines) def test_write_dataflow_auto_compression(self): - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | beam.core.Create('Create', self.lines) pcoll | 'Write' >> WriteToText(self.path, file_name_suffix='.gz') # pylint: disable=expression-not-assigned pipeline.run() @@ -547,7 +549,7 @@ class TextSinkTest(unittest.TestCase): self.assertEqual(read_result, self.lines) def test_write_dataflow_auto_compression_unsharded(self): - pipeline = beam.Pipeline('DirectRunner') + pipeline = TestPipeline() pcoll = pipeline | beam.core.Create('Create', self.lines) pcoll | 'Write' >> WriteToText(self.path + '.gz', shard_name_template='') # pylint: disable=expression-not-assigned pipeline.run() http://git-wip-us.apache.org/repos/asf/beam/blob/703c1bc1/sdks/python/tox.ini ---------------------------------------------------------------------- diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 0110273..fa78d6f 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -25,6 +25,7 @@ select = E3 [testenv:py27] deps= + nose pep8 pylint commands =
