Migrate Python tests to not depend on fix sharding for file output
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b5c257d5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b5c257d5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b5c257d5 Branch: refs/heads/DSL_SQL Commit: b5c257d5fa2e3445a37a8154bde706392c23c305 Parents: 513c952 Author: Charles Chen <[email protected]> Authored: Mon Jun 5 16:31:13 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Tue Jun 6 13:55:13 2017 -0700 ---------------------------------------------------------------------- .../complete/juliaset/juliaset/juliaset_test.py | 5 +++-- .../apache_beam/examples/complete/tfidf_test.py | 5 +++-- .../examples/cookbook/group_with_coder_test.py | 5 +++-- .../examples/cookbook/mergecontacts_test.py | 3 ++- .../examples/cookbook/multiple_output_pardo_test.py | 11 ++++++----- .../examples/wordcount_debugging_test.py | 3 ++- .../apache_beam/examples/wordcount_minimal_test.py | 3 ++- sdks/python/apache_beam/examples/wordcount_test.py | 3 ++- sdks/python/apache_beam/testing/util.py | 16 ++++++++++++++++ 9 files changed, 39 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b5c257d5/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py index 17d9cf3..91c75aa 100644 --- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py +++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test.py @@ -25,6 +25,7 @@ import unittest from apache_beam.examples.complete.juliaset.juliaset import juliaset +from apache_beam.testing.util import open_shards class JuliaSetTest(unittest.TestCase): @@ -60,8 +61,8 @@ class JuliaSetTest(unittest.TestCase): # Parse the results from the file, and ensure it was written in the proper # format. - with open(self.test_files['output_coord_file_name'] + - '-00000-of-00001') as result_file: + with open_shards(self.test_files['output_coord_file_name'] + + '-*-of-*') as result_file: output_lines = result_file.readlines() # Should have a line for each x-coordinate. http://git-wip-us.apache.org/repos/asf/beam/blob/b5c257d5/sdks/python/apache_beam/examples/complete/tfidf_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py index 322426f..b6f8825 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf_test.py +++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py @@ -28,6 +28,7 @@ from apache_beam.examples.complete import tfidf from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.testing.util import open_shards EXPECTED_RESULTS = set([ @@ -76,8 +77,8 @@ class TfIdfTest(unittest.TestCase): '--output', os.path.join(temp_folder, 'result')]) # Parse result file and compare. results = [] - with open(os.path.join(temp_folder, - 'result-00000-of-00001')) as result_file: + with open_shards(os.path.join( + temp_folder, 'result-*-of-*')) as result_file: for line in result_file: match = re.search(EXPECTED_LINE_RE, line) logging.info('Result line: %s', line) http://git-wip-us.apache.org/repos/asf/beam/blob/b5c257d5/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py index 268ba8d..fb630ba 100644 --- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py +++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py @@ -22,6 +22,7 @@ import tempfile import unittest from apache_beam.examples.cookbook import group_with_coder +from apache_beam.testing.util import open_shards # Patch group_with_coder.PlayerCoder.decode(). To test that the PlayerCoder was @@ -53,7 +54,7 @@ class GroupWithCoderTest(unittest.TestCase): '--output=%s.result' % temp_path]) # Parse result file and compare. results = [] - with open(temp_path + '.result-00000-of-00001') as result_file: + with open_shards(temp_path + '.result-*-of-*') as result_file: for line in result_file: name, points = line.split(',') results.append((name, int(points))) @@ -74,7 +75,7 @@ class GroupWithCoderTest(unittest.TestCase): '--output=%s.result' % temp_path]) # Parse result file and compare. results = [] - with open(temp_path + '.result-00000-of-00001') as result_file: + with open_shards(temp_path + '.result-*-of-*') as result_file: for line in result_file: name, points = line.split(',') results.append((name, int(points))) http://git-wip-us.apache.org/repos/asf/beam/blob/b5c257d5/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py index b3be0dd..32a3d51 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py @@ -22,6 +22,7 @@ import tempfile import unittest from apache_beam.examples.cookbook import mergecontacts +from apache_beam.testing.util import open_shards class MergeContactsTest(unittest.TestCase): @@ -114,7 +115,7 @@ class MergeContactsTest(unittest.TestCase): '--output_tsv=%s.tsv' % result_prefix, '--output_stats=%s.stats' % result_prefix], assert_results=(2, 1, 3)) - with open('%s.tsv-00000-of-00001' % result_prefix) as f: + with open_shards('%s.tsv-*-of-*' % result_prefix) as f: contents = f.read() self.assertEqual(self.EXPECTED_TSV, self.normalize_tsv_results(contents)) http://git-wip-us.apache.org/repos/asf/beam/blob/b5c257d5/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 3ddd668..1051106 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -23,6 +23,7 @@ import tempfile import unittest from apache_beam.examples.cookbook import multiple_output_pardo +from apache_beam.testing.util import open_shards class MultipleOutputParDo(unittest.TestCase): @@ -37,9 +38,9 @@ class MultipleOutputParDo(unittest.TestCase): f.write(contents) return f.name - def get_wordcount_results(self, temp_path): + def get_wordcount_results(self, result_path): results = [] - with open(temp_path) as result_file: + with open_shards(result_path) as result_file: for line in result_file: match = re.search(r'([A-Za-z]+): ([0-9]+)', line) if match is not None: @@ -55,15 +56,15 @@ class MultipleOutputParDo(unittest.TestCase): '--output=%s' % result_prefix]) expected_char_count = len(''.join(self.SAMPLE_TEXT.split('\n'))) - with open(result_prefix + '-chars-00000-of-00001') as f: + with open_shards(result_prefix + '-chars-*-of-*') as f: contents = f.read() self.assertEqual(expected_char_count, int(contents)) short_words = self.get_wordcount_results( - result_prefix + '-short-words-00000-of-00001') + result_prefix + '-short-words-*-of-*') self.assertEqual(sorted(short_words), sorted(self.EXPECTED_SHORT_WORDS)) - words = self.get_wordcount_results(result_prefix + '-words-00000-of-00001') + words = self.get_wordcount_results(result_prefix + '-words-*-of-*') self.assertEqual(sorted(words), sorted(self.EXPECTED_WORDS)) http://git-wip-us.apache.org/repos/asf/beam/blob/b5c257d5/sdks/python/apache_beam/examples/wordcount_debugging_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_debugging_test.py b/sdks/python/apache_beam/examples/wordcount_debugging_test.py index 900a8e7..92ee240 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging_test.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging_test.py @@ -23,6 +23,7 @@ import tempfile import unittest from apache_beam.examples import wordcount_debugging +from apache_beam.testing.util import open_shards class WordCountTest(unittest.TestCase): @@ -36,7 +37,7 @@ class WordCountTest(unittest.TestCase): def get_results(self, temp_path): results = [] - with open(temp_path + '.result-00000-of-00001') as result_file: + with open_shards(temp_path + '.result-*-of-*') as result_file: for line in result_file: match = re.search(r'([A-Za-z]+): ([0-9]+)', line) if match is not None: http://git-wip-us.apache.org/repos/asf/beam/blob/b5c257d5/sdks/python/apache_beam/examples/wordcount_minimal_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_minimal_test.py b/sdks/python/apache_beam/examples/wordcount_minimal_test.py index 82bace4..5ee7b78 100644 --- a/sdks/python/apache_beam/examples/wordcount_minimal_test.py +++ b/sdks/python/apache_beam/examples/wordcount_minimal_test.py @@ -24,6 +24,7 @@ import tempfile import unittest from apache_beam.examples import wordcount_minimal +from apache_beam.testing.util import open_shards class WordCountMinimalTest(unittest.TestCase): @@ -46,7 +47,7 @@ class WordCountMinimalTest(unittest.TestCase): '--output=%s.result' % temp_path]) # Parse result file and compare. results = [] - with open(temp_path + '.result-00000-of-00001') as result_file: + with open_shards(temp_path + '.result-*-of-*') as result_file: for line in result_file: match = re.search(r'([a-z]+): ([0-9]+)', line) if match is not None: http://git-wip-us.apache.org/repos/asf/beam/blob/b5c257d5/sdks/python/apache_beam/examples/wordcount_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_test.py b/sdks/python/apache_beam/examples/wordcount_test.py index 616540b..9834ba5 100644 --- a/sdks/python/apache_beam/examples/wordcount_test.py +++ b/sdks/python/apache_beam/examples/wordcount_test.py @@ -24,6 +24,7 @@ import tempfile import unittest from apache_beam.examples import wordcount +from apache_beam.testing.util import open_shards class WordCountTest(unittest.TestCase): @@ -45,7 +46,7 @@ class WordCountTest(unittest.TestCase): '--output=%s.result' % temp_path]) # Parse result file and compare. results = [] - with open(temp_path + '.result-00000-of-00001') as result_file: + with open_shards(temp_path + '.result-*-of-*') as result_file: for line in result_file: match = re.search(r'([a-z]+): ([0-9]+)', line) if match is not None: http://git-wip-us.apache.org/repos/asf/beam/blob/b5c257d5/sdks/python/apache_beam/testing/util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py index 60a6b21..959f25f 100644 --- a/sdks/python/apache_beam/testing/util.py +++ b/sdks/python/apache_beam/testing/util.py @@ -19,6 +19,9 @@ from __future__ import absolute_import +import glob +import tempfile + from apache_beam import pvalue from apache_beam.transforms import window from apache_beam.transforms.core import Create @@ -26,12 +29,15 @@ from apache_beam.transforms.core import Map from apache_beam.transforms.core import WindowInto from apache_beam.transforms.util import CoGroupByKey from apache_beam.transforms.ptransform import PTransform +from apache_beam.utils.annotations import experimental __all__ = [ 'assert_that', 'equal_to', 'is_empty', + # open_shards is internal and has no backwards compatibility guarantees. + 'open_shards', ] @@ -105,3 +111,13 @@ def assert_that(actual, matcher, label='assert_that'): return label actual | AssertThat() # pylint: disable=expression-not-assigned + + +@experimental() +def open_shards(glob_pattern): + """Returns a composite file of all shards matching the given glob pattern.""" + with tempfile.NamedTemporaryFile(delete=False) as f: + for shard in glob.glob(glob_pattern): + f.write(file(shard).read()) + concatenated_file_name = f.name + return file(concatenated_file_name, 'rb')
