Repository: beam Updated Branches: refs/heads/master 65135fd7a -> 70efdd0fe
[BEAM-1715] Fix Python WordCount on Dataflow Mismatch Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fa6f5f0f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fa6f5f0f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fa6f5f0f Branch: refs/heads/master Commit: fa6f5f0f45a3cb8343d0a30dac8f75a8097d65d1 Parents: 65135fd Author: Mark Liu <[email protected]> Authored: Tue Mar 14 12:43:45 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Wed Mar 22 17:34:18 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/examples/wordcount_it_test.py | 5 +++- .../apache_beam/tests/pipeline_verifiers.py | 31 +++++++++++++++++++- .../tests/pipeline_verifiers_test.py | 21 +++++++++++++ sdks/python/run_postcommit.sh | 3 +- 4 files changed, 57 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fa6f5f0f/sdks/python/apache_beam/examples/wordcount_it_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py index 77926bb..1c700b6 100644 --- a/sdks/python/apache_beam/examples/wordcount_it_test.py +++ b/sdks/python/apache_beam/examples/wordcount_it_test.py @@ -43,9 +43,12 @@ class WordCountIT(unittest.TestCase): output = '/'.join([test_pipeline.get_option('output'), test_pipeline.get_option('job_name'), 'results']) + arg_sleep_secs = test_pipeline.get_option('sleep_secs') + sleep_secs = int(arg_sleep_secs) if arg_sleep_secs is not None else None pipeline_verifiers = [PipelineStateMatcher(), FileChecksumMatcher(output + '*-of-*', - self.DEFAULT_CHECKSUM)] + self.DEFAULT_CHECKSUM, + sleep_secs)] extra_opts = {'output': output, 'on_success_matcher': all_of(*pipeline_verifiers)} http://git-wip-us.apache.org/repos/asf/beam/blob/fa6f5f0f/sdks/python/apache_beam/tests/pipeline_verifiers.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py index 0d6814e..3cac658 100644 --- a/sdks/python/apache_beam/tests/pipeline_verifiers.py +++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py @@ -23,6 +23,7 @@ of test pipeline job. Customized verifier should extend """ import logging +import time from hamcrest.core.base_matcher import BaseMatcher @@ -79,7 +80,27 @@ class FileChecksumMatcher(BaseMatcher): is a hash string computed from content of file(s). """ - def __init__(self, file_path, expected_checksum): + def __init__(self, file_path, expected_checksum, sleep_secs=None): + """Initialize a FileChecksumMatcher object + + Args: + file_path : A string that is the full path of output file. This path + can contain globs. + expected_checksum : A hash string that is computed from expected + result. + sleep_secs : Number of seconds to wait before verification start. + Extra time are given to make sure output files are ready on FS. + """ + if sleep_secs is not None: + if isinstance(sleep_secs, int): + self.sleep_secs = sleep_secs + else: + raise ValueError('Sleep seconds, if received, must be int. ' + 'But received: %r, %s' % (sleep_secs, + type(sleep_secs))) + else: + self.sleep_secs = None + self.file_path = file_path self.file_system = get_filesystem(self.file_path) self.expected_checksum = expected_checksum @@ -94,6 +115,9 @@ class FileChecksumMatcher(BaseMatcher): matched_path = [f.path for f in match_result.metadata_list] if not matched_path: raise IOError('No such file or directory: %s' % self.file_path) + + logging.info('Find %d files in %s: \n%s', + len(matched_path), self.file_path, '\n'.join(matched_path)) for path in matched_path: with self.file_system.open(path, 'r') as f: for line in f: @@ -101,6 +125,11 @@ class FileChecksumMatcher(BaseMatcher): return read_lines def _matches(self, _): + if self.sleep_secs: + # Wait to have output file ready on FS + logging.info('Wait %d seconds...', self.sleep_secs) + time.sleep(self.sleep_secs) + # Read from given file(s) path read_lines = self._read_with_retry() http://git-wip-us.apache.org/repos/asf/beam/blob/fa6f5f0f/sdks/python/apache_beam/tests/pipeline_verifiers_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py index af8f441..909917d 100644 --- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py +++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py @@ -121,6 +121,27 @@ class PipelineVerifiersTest(unittest.TestCase): self.assertTrue(mock_match.called) self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count) + def test_file_checksum_matchcer_invalid_sleep_time(self): + with self.assertRaises(ValueError) as cm: + verifiers.FileChecksumMatcher('file_path', + 'expected_checksum', + 'invalid_sleep_time') + self.assertEqual(cm.exception.message, + 'Sleep seconds, if received, must be int. ' + 'But received: \'invalid_sleep_time\', ' + '<type \'str\'>') + + @patch('time.sleep', return_value=None) + def test_file_checksum_matcher_sleep_before_verify(self, mocked_sleep): + temp_dir = tempfile.mkdtemp() + case = self.test_cases[0] + self.create_temp_file(case['content'], temp_dir) + matcher = verifiers.FileChecksumMatcher(temp_dir + '/*', + case['expected_checksum'], + 10) + hc_assert_that(self._mock_result, matcher) + self.assertTrue(mocked_sleep.called) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) http://git-wip-us.apache.org/repos/asf/beam/blob/fa6f5f0f/sdks/python/run_postcommit.sh ---------------------------------------------------------------------- diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh index 4d17942..50338e2 100755 --- a/sdks/python/run_postcommit.sh +++ b/sdks/python/run_postcommit.sh @@ -101,4 +101,5 @@ python setup.py nosetests \ --output=$GCS_LOCATION/py-wordcount-cloud/output \ --sdk_location=$SDK_LOCATION \ --job_name=$JOBNAME_E2E_WC \ - --num_workers=1" + --num_workers=1 \ + --sleep_secs=20"
